1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "container/list" 21 "context" 22 "errors" 23 "fmt" 24 "math" 25 "regexp" 26 "strings" 27 "time" 28 29 btopt "cloud.google.com/go/bigtable/internal/option" 30 "cloud.google.com/go/iam" 31 "cloud.google.com/go/internal/optional" 32 "cloud.google.com/go/longrunning" 33 lroauto "cloud.google.com/go/longrunning/autogen" 34 "github.com/golang/protobuf/ptypes" 35 durpb "github.com/golang/protobuf/ptypes/duration" 36 gax "github.com/googleapis/gax-go/v2" 37 "google.golang.org/api/cloudresourcemanager/v1" 38 "google.golang.org/api/iterator" 39 "google.golang.org/api/option" 40 gtransport "google.golang.org/api/transport/grpc" 41 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 42 "google.golang.org/genproto/protobuf/field_mask" 43 "google.golang.org/grpc" 44 "google.golang.org/grpc/codes" 45 "google.golang.org/grpc/metadata" 46 "google.golang.org/grpc/status" 47) 48 49const adminAddr = "bigtableadmin.googleapis.com:443" 50 51// AdminClient is a client type for performing admin operations within a specific instance. 52type AdminClient struct { 53 conn *grpc.ClientConn 54 tClient btapb.BigtableTableAdminClient 55 lroClient *lroauto.OperationsClient 56 57 project, instance string 58 59 // Metadata to be sent with each request. 60 md metadata.MD 61} 62 63// NewAdminClient creates a new AdminClient for a given project and instance. 64func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) { 65 o, err := btopt.DefaultClientOptions(adminAddr, AdminScope, clientUserAgent) 66 if err != nil { 67 return nil, err 68 } 69 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 70 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 71 // Need to add scopes for long running operations (for create table & snapshots) 72 o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope)) 73 o = append(o, opts...) 74 conn, err := gtransport.Dial(ctx, o...) 75 if err != nil { 76 return nil, fmt.Errorf("dialing: %v", err) 77 } 78 79 lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn)) 80 if err != nil { 81 // This error "should not happen", since we are just reusing old connection 82 // and never actually need to dial. 83 // If this does happen, we could leak conn. However, we cannot close conn: 84 // If the user invoked the function with option.WithGRPCConn, 85 // we would close a connection that's still in use. 86 // TODO(pongad): investigate error conditions. 87 return nil, err 88 } 89 90 return &AdminClient{ 91 conn: conn, 92 tClient: btapb.NewBigtableTableAdminClient(conn), 93 lroClient: lroClient, 94 project: project, 95 instance: instance, 96 md: metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)), 97 }, nil 98} 99 100// Close closes the AdminClient. 101func (ac *AdminClient) Close() error { 102 return ac.conn.Close() 103} 104 105func (ac *AdminClient) instancePrefix() string { 106 return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance) 107} 108 109// Tables returns a list of the tables in the instance. 110func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { 111 ctx = mergeOutgoingMetadata(ctx, ac.md) 112 prefix := ac.instancePrefix() 113 req := &btapb.ListTablesRequest{ 114 Parent: prefix, 115 } 116 117 var res *btapb.ListTablesResponse 118 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 119 var err error 120 res, err = ac.tClient.ListTables(ctx, req) 121 return err 122 }, retryOptions...) 123 if err != nil { 124 return nil, err 125 } 126 127 names := make([]string, 0, len(res.Tables)) 128 for _, tbl := range res.Tables { 129 names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/")) 130 } 131 return names, nil 132} 133 134// TableConf contains all of the information necessary to create a table with column families. 135type TableConf struct { 136 TableID string 137 SplitKeys []string 138 // Families is a map from family name to GCPolicy 139 Families map[string]GCPolicy 140} 141 142// CreateTable creates a new table in the instance. 143// This method may return before the table's creation is complete. 144func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { 145 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table}) 146} 147 148// CreatePresplitTable creates a new table in the instance. 149// The list of row keys will be used to initially split the table into multiple tablets. 150// Given two split keys, "s1" and "s2", three tablets will be created, 151// spanning the key ranges: [, s1), [s1, s2), [s2, ). 152// This method may return before the table's creation is complete. 153func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error { 154 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys}) 155} 156 157// CreateTableFromConf creates a new table in the instance from the given configuration. 158func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error { 159 ctx = mergeOutgoingMetadata(ctx, ac.md) 160 var reqSplits []*btapb.CreateTableRequest_Split 161 for _, split := range conf.SplitKeys { 162 reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)}) 163 } 164 var tbl btapb.Table 165 if conf.Families != nil { 166 tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) 167 for fam, policy := range conf.Families { 168 tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()} 169 } 170 } 171 prefix := ac.instancePrefix() 172 req := &btapb.CreateTableRequest{ 173 Parent: prefix, 174 TableId: conf.TableID, 175 Table: &tbl, 176 InitialSplits: reqSplits, 177 } 178 _, err := ac.tClient.CreateTable(ctx, req) 179 return err 180} 181 182// CreateColumnFamily creates a new column family in a table. 183func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error { 184 // TODO(dsymonds): Permit specifying gcexpr and any other family settings. 185 ctx = mergeOutgoingMetadata(ctx, ac.md) 186 prefix := ac.instancePrefix() 187 req := &btapb.ModifyColumnFamiliesRequest{ 188 Name: prefix + "/tables/" + table, 189 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 190 Id: family, 191 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, 192 }}, 193 } 194 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 195 return err 196} 197 198// DeleteTable deletes a table and all of its data. 199func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error { 200 ctx = mergeOutgoingMetadata(ctx, ac.md) 201 prefix := ac.instancePrefix() 202 req := &btapb.DeleteTableRequest{ 203 Name: prefix + "/tables/" + table, 204 } 205 _, err := ac.tClient.DeleteTable(ctx, req) 206 return err 207} 208 209// DeleteColumnFamily deletes a column family in a table and all of its data. 210func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error { 211 ctx = mergeOutgoingMetadata(ctx, ac.md) 212 prefix := ac.instancePrefix() 213 req := &btapb.ModifyColumnFamiliesRequest{ 214 Name: prefix + "/tables/" + table, 215 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 216 Id: family, 217 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true}, 218 }}, 219 } 220 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 221 return err 222} 223 224// TableInfo represents information about a table. 225type TableInfo struct { 226 // DEPRECATED - This field is deprecated. Please use FamilyInfos instead. 227 Families []string 228 FamilyInfos []FamilyInfo 229} 230 231// FamilyInfo represents information about a column family. 232type FamilyInfo struct { 233 Name string 234 GCPolicy string 235} 236 237// TableInfo retrieves information about a table. 238func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) { 239 ctx = mergeOutgoingMetadata(ctx, ac.md) 240 prefix := ac.instancePrefix() 241 req := &btapb.GetTableRequest{ 242 Name: prefix + "/tables/" + table, 243 } 244 245 var res *btapb.Table 246 247 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 248 var err error 249 res, err = ac.tClient.GetTable(ctx, req) 250 return err 251 }, retryOptions...) 252 if err != nil { 253 return nil, err 254 } 255 256 ti := &TableInfo{} 257 for name, fam := range res.ColumnFamilies { 258 ti.Families = append(ti.Families, name) 259 ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)}) 260 } 261 return ti, nil 262} 263 264// SetGCPolicy specifies which cells in a column family should be garbage collected. 265// GC executes opportunistically in the background; table reads may return data 266// matching the GC policy. 267func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { 268 ctx = mergeOutgoingMetadata(ctx, ac.md) 269 prefix := ac.instancePrefix() 270 req := &btapb.ModifyColumnFamiliesRequest{ 271 Name: prefix + "/tables/" + table, 272 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 273 Id: family, 274 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}}, 275 }}, 276 } 277 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 278 return err 279} 280 281// DropRowRange permanently deletes a row range from the specified table. 282func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error { 283 ctx = mergeOutgoingMetadata(ctx, ac.md) 284 prefix := ac.instancePrefix() 285 req := &btapb.DropRowRangeRequest{ 286 Name: prefix + "/tables/" + table, 287 Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)}, 288 } 289 _, err := ac.tClient.DropRowRange(ctx, req) 290 return err 291} 292 293// DropAllRows permanently deletes all rows from the specified table. 294func (ac *AdminClient) DropAllRows(ctx context.Context, table string) error { 295 ctx = mergeOutgoingMetadata(ctx, ac.md) 296 prefix := ac.instancePrefix() 297 req := &btapb.DropRowRangeRequest{ 298 Name: prefix + "/tables/" + table, 299 Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true}, 300 } 301 _, err := ac.tClient.DropRowRange(ctx, req) 302 return err 303} 304 305// CreateTableFromSnapshot creates a table from snapshot. 306// The table will be created in the same cluster as the snapshot. 307// 308// This is a private alpha release of Cloud Bigtable snapshots. This feature 309// is not currently available to most Cloud Bigtable customers. This feature 310// might be changed in backward-incompatible ways and is not recommended for 311// production use. It is not subject to any SLA or deprecation policy. 312func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error { 313 ctx = mergeOutgoingMetadata(ctx, ac.md) 314 prefix := ac.instancePrefix() 315 snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot 316 317 req := &btapb.CreateTableFromSnapshotRequest{ 318 Parent: prefix, 319 TableId: table, 320 SourceSnapshot: snapshotPath, 321 } 322 op, err := ac.tClient.CreateTableFromSnapshot(ctx, req) 323 if err != nil { 324 return err 325 } 326 resp := btapb.Table{} 327 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 328} 329 330// DefaultSnapshotDuration is the default TTL for a snapshot. 331const DefaultSnapshotDuration time.Duration = 0 332 333// SnapshotTable creates a new snapshot in the specified cluster from the 334// specified source table. Setting the TTL to `DefaultSnapshotDuration` will 335// use the server side default for the duration. 336// 337// This is a private alpha release of Cloud Bigtable snapshots. This feature 338// is not currently available to most Cloud Bigtable customers. This feature 339// might be changed in backward-incompatible ways and is not recommended for 340// production use. It is not subject to any SLA or deprecation policy. 341func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error { 342 ctx = mergeOutgoingMetadata(ctx, ac.md) 343 prefix := ac.instancePrefix() 344 345 var ttlProto *durpb.Duration 346 347 if ttl > 0 { 348 ttlProto = ptypes.DurationProto(ttl) 349 } 350 351 req := &btapb.SnapshotTableRequest{ 352 Name: prefix + "/tables/" + table, 353 Cluster: prefix + "/clusters/" + cluster, 354 SnapshotId: snapshot, 355 Ttl: ttlProto, 356 } 357 358 op, err := ac.tClient.SnapshotTable(ctx, req) 359 if err != nil { 360 return err 361 } 362 resp := btapb.Snapshot{} 363 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 364} 365 366// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster. 367// To list snapshots across all of the clusters in the instance specify "-" as the cluster. 368// 369// This is a private alpha release of Cloud Bigtable snapshots. This feature is not 370// currently available to most Cloud Bigtable customers. This feature might be 371// changed in backward-incompatible ways and is not recommended for production use. 372// It is not subject to any SLA or deprecation policy. 373func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator { 374 ctx = mergeOutgoingMetadata(ctx, ac.md) 375 prefix := ac.instancePrefix() 376 clusterPath := prefix + "/clusters/" + cluster 377 378 it := &SnapshotIterator{} 379 req := &btapb.ListSnapshotsRequest{ 380 Parent: clusterPath, 381 } 382 383 fetch := func(pageSize int, pageToken string) (string, error) { 384 req.PageToken = pageToken 385 if pageSize > math.MaxInt32 { 386 req.PageSize = math.MaxInt32 387 } else { 388 req.PageSize = int32(pageSize) 389 } 390 391 var resp *btapb.ListSnapshotsResponse 392 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 393 var err error 394 resp, err = ac.tClient.ListSnapshots(ctx, req) 395 return err 396 }, retryOptions...) 397 if err != nil { 398 return "", err 399 } 400 for _, s := range resp.Snapshots { 401 snapshotInfo, err := newSnapshotInfo(s) 402 if err != nil { 403 return "", fmt.Errorf("failed to parse snapshot proto %v", err) 404 } 405 it.items = append(it.items, snapshotInfo) 406 } 407 return resp.NextPageToken, nil 408 } 409 bufLen := func() int { return len(it.items) } 410 takeBuf := func() interface{} { b := it.items; it.items = nil; return b } 411 412 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 413 414 return it 415} 416 417func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) { 418 nameParts := strings.Split(snapshot.Name, "/") 419 name := nameParts[len(nameParts)-1] 420 tablePathParts := strings.Split(snapshot.SourceTable.Name, "/") 421 tableID := tablePathParts[len(tablePathParts)-1] 422 423 createTime, err := ptypes.Timestamp(snapshot.CreateTime) 424 if err != nil { 425 return nil, fmt.Errorf("invalid createTime: %v", err) 426 } 427 428 deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime) 429 if err != nil { 430 return nil, fmt.Errorf("invalid deleteTime: %v", err) 431 } 432 433 return &SnapshotInfo{ 434 Name: name, 435 SourceTable: tableID, 436 DataSize: snapshot.DataSizeBytes, 437 CreateTime: createTime, 438 DeleteTime: deleteTime, 439 }, nil 440} 441 442// SnapshotIterator is an EntryIterator that iterates over log entries. 443// 444// This is a private alpha release of Cloud Bigtable snapshots. This feature 445// is not currently available to most Cloud Bigtable customers. This feature 446// might be changed in backward-incompatible ways and is not recommended for 447// production use. It is not subject to any SLA or deprecation policy. 448type SnapshotIterator struct { 449 items []*SnapshotInfo 450 pageInfo *iterator.PageInfo 451 nextFunc func() error 452} 453 454// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 455func (it *SnapshotIterator) PageInfo() *iterator.PageInfo { 456 return it.pageInfo 457} 458 459// Next returns the next result. Its second return value is iterator.Done 460// (https://godoc.org/google.golang.org/api/iterator) if there are no more 461// results. Once Next returns Done, all subsequent calls will return Done. 462func (it *SnapshotIterator) Next() (*SnapshotInfo, error) { 463 if err := it.nextFunc(); err != nil { 464 return nil, err 465 } 466 item := it.items[0] 467 it.items = it.items[1:] 468 return item, nil 469} 470 471// SnapshotInfo contains snapshot metadata. 472type SnapshotInfo struct { 473 Name string 474 SourceTable string 475 DataSize int64 476 CreateTime time.Time 477 DeleteTime time.Time 478} 479 480// SnapshotInfo gets snapshot metadata. 481// 482// This is a private alpha release of Cloud Bigtable snapshots. This feature 483// is not currently available to most Cloud Bigtable customers. This feature 484// might be changed in backward-incompatible ways and is not recommended for 485// production use. It is not subject to any SLA or deprecation policy. 486func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) { 487 ctx = mergeOutgoingMetadata(ctx, ac.md) 488 prefix := ac.instancePrefix() 489 clusterPath := prefix + "/clusters/" + cluster 490 snapshotPath := clusterPath + "/snapshots/" + snapshot 491 492 req := &btapb.GetSnapshotRequest{ 493 Name: snapshotPath, 494 } 495 496 var resp *btapb.Snapshot 497 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 498 var err error 499 resp, err = ac.tClient.GetSnapshot(ctx, req) 500 return err 501 }, retryOptions...) 502 if err != nil { 503 return nil, err 504 } 505 506 return newSnapshotInfo(resp) 507} 508 509// DeleteSnapshot deletes a snapshot in a cluster. 510// 511// This is a private alpha release of Cloud Bigtable snapshots. This feature 512// is not currently available to most Cloud Bigtable customers. This feature 513// might be changed in backward-incompatible ways and is not recommended for 514// production use. It is not subject to any SLA or deprecation policy. 515func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error { 516 ctx = mergeOutgoingMetadata(ctx, ac.md) 517 prefix := ac.instancePrefix() 518 clusterPath := prefix + "/clusters/" + cluster 519 snapshotPath := clusterPath + "/snapshots/" + snapshot 520 521 req := &btapb.DeleteSnapshotRequest{ 522 Name: snapshotPath, 523 } 524 _, err := ac.tClient.DeleteSnapshot(ctx, req) 525 return err 526} 527 528// getConsistencyToken gets the consistency token for a table. 529func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) { 530 req := &btapb.GenerateConsistencyTokenRequest{ 531 Name: tableName, 532 } 533 resp, err := ac.tClient.GenerateConsistencyToken(ctx, req) 534 if err != nil { 535 return "", err 536 } 537 return resp.GetConsistencyToken(), nil 538} 539 540// isConsistent checks if a token is consistent for a table. 541func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) { 542 req := &btapb.CheckConsistencyRequest{ 543 Name: tableName, 544 ConsistencyToken: token, 545 } 546 var resp *btapb.CheckConsistencyResponse 547 548 // Retry calls on retryable errors to avoid losing the token gathered before. 549 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 550 var err error 551 resp, err = ac.tClient.CheckConsistency(ctx, req) 552 return err 553 }, retryOptions...) 554 if err != nil { 555 return false, err 556 } 557 return resp.GetConsistent(), nil 558} 559 560// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication. 561func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error { 562 ctx = mergeOutgoingMetadata(ctx, ac.md) 563 // Get the token. 564 prefix := ac.instancePrefix() 565 tableName := prefix + "/tables/" + table 566 token, err := ac.getConsistencyToken(ctx, tableName) 567 if err != nil { 568 return err 569 } 570 571 // Periodically check if the token is consistent. 572 timer := time.NewTicker(time.Second * 10) 573 defer timer.Stop() 574 for { 575 consistent, err := ac.isConsistent(ctx, tableName, token) 576 if err != nil { 577 return err 578 } 579 if consistent { 580 return nil 581 } 582 // Sleep for a bit or until the ctx is cancelled. 583 select { 584 case <-ctx.Done(): 585 return ctx.Err() 586 case <-timer.C: 587 } 588 } 589} 590 591// TableIAM creates an IAM client specific to a given Instance and Table within the configured project. 592func (ac *AdminClient) TableIAM(tableID string) *iam.Handle { 593 return iam.InternalNewHandleGRPCClient(ac.tClient, 594 "projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID) 595} 596 597const instanceAdminAddr = "bigtableadmin.googleapis.com:443" 598 599// InstanceAdminClient is a client type for performing admin operations on instances. 600// These operations can be substantially more dangerous than those provided by AdminClient. 601type InstanceAdminClient struct { 602 conn *grpc.ClientConn 603 iClient btapb.BigtableInstanceAdminClient 604 lroClient *lroauto.OperationsClient 605 606 project string 607 608 // Metadata to be sent with each request. 609 md metadata.MD 610} 611 612// NewInstanceAdminClient creates a new InstanceAdminClient for a given project. 613func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) { 614 o, err := btopt.DefaultClientOptions(instanceAdminAddr, InstanceAdminScope, clientUserAgent) 615 if err != nil { 616 return nil, err 617 } 618 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 619 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 620 o = append(o, opts...) 621 conn, err := gtransport.Dial(ctx, o...) 622 if err != nil { 623 return nil, fmt.Errorf("dialing: %v", err) 624 } 625 626 lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn)) 627 if err != nil { 628 // This error "should not happen", since we are just reusing old connection 629 // and never actually need to dial. 630 // If this does happen, we could leak conn. However, we cannot close conn: 631 // If the user invoked the function with option.WithGRPCConn, 632 // we would close a connection that's still in use. 633 // TODO(pongad): investigate error conditions. 634 return nil, err 635 } 636 637 return &InstanceAdminClient{ 638 conn: conn, 639 iClient: btapb.NewBigtableInstanceAdminClient(conn), 640 lroClient: lroClient, 641 642 project: project, 643 md: metadata.Pairs(resourcePrefixHeader, "projects/"+project), 644 }, nil 645} 646 647// Close closes the InstanceAdminClient. 648func (iac *InstanceAdminClient) Close() error { 649 return iac.conn.Close() 650} 651 652// StorageType is the type of storage used for all tables in an instance 653type StorageType int 654 655const ( 656 SSD StorageType = iota 657 HDD 658) 659 660func (st StorageType) proto() btapb.StorageType { 661 if st == HDD { 662 return btapb.StorageType_HDD 663 } 664 return btapb.StorageType_SSD 665} 666 667func storageTypeFromProto(st btapb.StorageType) StorageType { 668 if st == btapb.StorageType_HDD { 669 return HDD 670 } 671 672 return SSD 673} 674 675// InstanceType is the type of the instance 676type InstanceType int32 677 678const ( 679 // UNSPECIFIED instance types default to PRODUCTION 680 UNSPECIFIED InstanceType = InstanceType(btapb.Instance_TYPE_UNSPECIFIED) 681 PRODUCTION = InstanceType(btapb.Instance_PRODUCTION) 682 DEVELOPMENT = InstanceType(btapb.Instance_DEVELOPMENT) 683) 684 685// InstanceInfo represents information about an instance 686type InstanceInfo struct { 687 Name string // name of the instance 688 DisplayName string // display name for UIs 689 InstanceType InstanceType 690} 691 692// InstanceConf contains the information necessary to create an Instance 693type InstanceConf struct { 694 InstanceId, DisplayName, ClusterId, Zone string 695 // NumNodes must not be specified for DEVELOPMENT instance types 696 NumNodes int32 697 StorageType StorageType 698 InstanceType InstanceType 699} 700 701// InstanceWithClustersConfig contains the information necessary to create an Instance 702type InstanceWithClustersConfig struct { 703 InstanceID, DisplayName string 704 Clusters []ClusterConfig 705 InstanceType InstanceType 706} 707 708var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`) 709 710// CreateInstance creates a new instance in the project. 711// This method will return when the instance has been created or when an error occurs. 712func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error { 713 ctx = mergeOutgoingMetadata(ctx, iac.md) 714 newConfig := InstanceWithClustersConfig{ 715 InstanceID: conf.InstanceId, 716 DisplayName: conf.DisplayName, 717 InstanceType: conf.InstanceType, 718 Clusters: []ClusterConfig{ 719 { 720 InstanceID: conf.InstanceId, 721 ClusterID: conf.ClusterId, 722 Zone: conf.Zone, 723 NumNodes: conf.NumNodes, 724 StorageType: conf.StorageType, 725 }, 726 }, 727 } 728 return iac.CreateInstanceWithClusters(ctx, &newConfig) 729} 730 731// CreateInstanceWithClusters creates a new instance with configured clusters in the project. 732// This method will return when the instance has been created or when an error occurs. 733func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 734 ctx = mergeOutgoingMetadata(ctx, iac.md) 735 clusters := make(map[string]*btapb.Cluster) 736 for _, cluster := range conf.Clusters { 737 clusters[cluster.ClusterID] = cluster.proto(iac.project) 738 } 739 740 req := &btapb.CreateInstanceRequest{ 741 Parent: "projects/" + iac.project, 742 InstanceId: conf.InstanceID, 743 Instance: &btapb.Instance{DisplayName: conf.DisplayName, Type: btapb.Instance_Type(conf.InstanceType)}, 744 Clusters: clusters, 745 } 746 747 lro, err := iac.iClient.CreateInstance(ctx, req) 748 if err != nil { 749 return err 750 } 751 resp := btapb.Instance{} 752 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 753} 754 755// updateInstance updates a single instance based on config fields that operate 756// at an instance level: DisplayName and InstanceType. 757func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *InstanceWithClustersConfig) (updated bool, err error) { 758 if conf.InstanceID == "" { 759 return false, errors.New("InstanceID is required") 760 } 761 762 // Update the instance, if necessary 763 mask := &field_mask.FieldMask{} 764 ireq := &btapb.PartialUpdateInstanceRequest{ 765 Instance: &btapb.Instance{ 766 Name: "projects/" + iac.project + "/instances/" + conf.InstanceID, 767 }, 768 UpdateMask: mask, 769 } 770 if conf.DisplayName != "" { 771 ireq.Instance.DisplayName = conf.DisplayName 772 mask.Paths = append(mask.Paths, "display_name") 773 } 774 if btapb.Instance_Type(conf.InstanceType) != btapb.Instance_TYPE_UNSPECIFIED { 775 ireq.Instance.Type = btapb.Instance_Type(conf.InstanceType) 776 mask.Paths = append(mask.Paths, "type") 777 } 778 779 if len(mask.Paths) == 0 { 780 return false, nil 781 } 782 783 lro, err := iac.iClient.PartialUpdateInstance(ctx, ireq) 784 if err != nil { 785 return false, err 786 } 787 err = longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 788 if err != nil { 789 return false, err 790 } 791 792 return true, nil 793} 794 795// UpdateInstanceWithClusters updates an instance and its clusters. Updateable 796// fields are instance display name, instance type and cluster size. 797// The provided InstanceWithClustersConfig is used as follows: 798// - InstanceID is required 799// - DisplayName and InstanceType are updated only if they are not empty 800// - ClusterID is required for any provided cluster 801// - All other cluster fields are ignored except for NumNodes, which if set will be updated 802// 803// This method may return an error after partially succeeding, for example if the instance is updated 804// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to 805// determine the current state. 806func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 807 ctx = mergeOutgoingMetadata(ctx, iac.md) 808 809 for _, cluster := range conf.Clusters { 810 if cluster.ClusterID == "" { 811 return errors.New("ClusterID is required for every cluster") 812 } 813 } 814 815 updatedInstance, err := iac.updateInstance(ctx, conf) 816 if err != nil { 817 return err 818 } 819 820 // Update any clusters 821 for _, cluster := range conf.Clusters { 822 err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes) 823 if err != nil { 824 if updatedInstance { 825 // We updated the instance, so note that in the error message. 826 return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded", 827 cluster.ClusterID, err) 828 } 829 return err 830 } 831 } 832 833 return nil 834} 835 836// DeleteInstance deletes an instance from the project. 837func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error { 838 ctx = mergeOutgoingMetadata(ctx, iac.md) 839 req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID} 840 _, err := iac.iClient.DeleteInstance(ctx, req) 841 return err 842} 843 844// Instances returns a list of instances in the project. 845func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) { 846 ctx = mergeOutgoingMetadata(ctx, iac.md) 847 req := &btapb.ListInstancesRequest{ 848 Parent: "projects/" + iac.project, 849 } 850 var res *btapb.ListInstancesResponse 851 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 852 var err error 853 res, err = iac.iClient.ListInstances(ctx, req) 854 return err 855 }, retryOptions...) 856 if err != nil { 857 return nil, err 858 } 859 if len(res.FailedLocations) > 0 { 860 // We don't have a good way to return a partial result in the face of some zones being unavailable. 861 // Fail the entire request. 862 return nil, status.Errorf(codes.Unavailable, "Failed locations: %v", res.FailedLocations) 863 } 864 865 var is []*InstanceInfo 866 for _, i := range res.Instances { 867 m := instanceNameRegexp.FindStringSubmatch(i.Name) 868 if m == nil { 869 return nil, fmt.Errorf("malformed instance name %q", i.Name) 870 } 871 is = append(is, &InstanceInfo{ 872 Name: m[2], 873 DisplayName: i.DisplayName, 874 InstanceType: InstanceType(i.Type), 875 }) 876 } 877 return is, nil 878} 879 880// InstanceInfo returns information about an instance. 881func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) { 882 ctx = mergeOutgoingMetadata(ctx, iac.md) 883 req := &btapb.GetInstanceRequest{ 884 Name: "projects/" + iac.project + "/instances/" + instanceID, 885 } 886 var res *btapb.Instance 887 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 888 var err error 889 res, err = iac.iClient.GetInstance(ctx, req) 890 return err 891 }, retryOptions...) 892 if err != nil { 893 return nil, err 894 } 895 896 m := instanceNameRegexp.FindStringSubmatch(res.Name) 897 if m == nil { 898 return nil, fmt.Errorf("malformed instance name %q", res.Name) 899 } 900 return &InstanceInfo{ 901 Name: m[2], 902 DisplayName: res.DisplayName, 903 InstanceType: InstanceType(res.Type), 904 }, nil 905} 906 907// ClusterConfig contains the information necessary to create a cluster 908type ClusterConfig struct { 909 InstanceID, ClusterID, Zone string 910 NumNodes int32 911 StorageType StorageType 912} 913 914func (cc *ClusterConfig) proto(project string) *btapb.Cluster { 915 return &btapb.Cluster{ 916 ServeNodes: cc.NumNodes, 917 DefaultStorageType: cc.StorageType.proto(), 918 Location: "projects/" + project + "/locations/" + cc.Zone, 919 } 920} 921 922// ClusterInfo represents information about a cluster. 923type ClusterInfo struct { 924 Name string // name of the cluster 925 Zone string // GCP zone of the cluster (e.g. "us-central1-a") 926 ServeNodes int // number of allocated serve nodes 927 State string // state of the cluster 928 StorageType StorageType // the storage type of the cluster 929} 930 931// CreateCluster creates a new cluster in an instance. 932// This method will return when the cluster has been created or when an error occurs. 933func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error { 934 ctx = mergeOutgoingMetadata(ctx, iac.md) 935 936 req := &btapb.CreateClusterRequest{ 937 Parent: "projects/" + iac.project + "/instances/" + conf.InstanceID, 938 ClusterId: conf.ClusterID, 939 Cluster: conf.proto(iac.project), 940 } 941 942 lro, err := iac.iClient.CreateCluster(ctx, req) 943 if err != nil { 944 return err 945 } 946 resp := btapb.Cluster{} 947 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 948} 949 950// DeleteCluster deletes a cluster from an instance. 951func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error { 952 ctx = mergeOutgoingMetadata(ctx, iac.md) 953 req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 954 _, err := iac.iClient.DeleteCluster(ctx, req) 955 return err 956} 957 958// UpdateCluster updates attributes of a cluster 959func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error { 960 ctx = mergeOutgoingMetadata(ctx, iac.md) 961 cluster := &btapb.Cluster{ 962 Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID, 963 ServeNodes: serveNodes} 964 lro, err := iac.iClient.UpdateCluster(ctx, cluster) 965 if err != nil { 966 return err 967 } 968 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 969} 970 971// Clusters lists the clusters in an instance. 972func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) { 973 ctx = mergeOutgoingMetadata(ctx, iac.md) 974 req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID} 975 var res *btapb.ListClustersResponse 976 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 977 var err error 978 res, err = iac.iClient.ListClusters(ctx, req) 979 return err 980 }, retryOptions...) 981 if err != nil { 982 return nil, err 983 } 984 // TODO(garyelliott): Deal with failed_locations. 985 var cis []*ClusterInfo 986 for _, c := range res.Clusters { 987 nameParts := strings.Split(c.Name, "/") 988 locParts := strings.Split(c.Location, "/") 989 cis = append(cis, &ClusterInfo{ 990 Name: nameParts[len(nameParts)-1], 991 Zone: locParts[len(locParts)-1], 992 ServeNodes: int(c.ServeNodes), 993 State: c.State.String(), 994 StorageType: storageTypeFromProto(c.DefaultStorageType), 995 }) 996 } 997 return cis, nil 998} 999 1000// GetCluster fetches a cluster in an instance 1001func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) { 1002 ctx = mergeOutgoingMetadata(ctx, iac.md) 1003 req := &btapb.GetClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 1004 var c *btapb.Cluster 1005 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1006 var err error 1007 c, err = iac.iClient.GetCluster(ctx, req) 1008 return err 1009 }, retryOptions...) 1010 if err != nil { 1011 return nil, err 1012 } 1013 1014 nameParts := strings.Split(c.Name, "/") 1015 locParts := strings.Split(c.Location, "/") 1016 cis := &ClusterInfo{ 1017 Name: nameParts[len(nameParts)-1], 1018 Zone: locParts[len(locParts)-1], 1019 ServeNodes: int(c.ServeNodes), 1020 State: c.State.String(), 1021 StorageType: storageTypeFromProto(c.DefaultStorageType), 1022 } 1023 return cis, nil 1024} 1025 1026// InstanceIAM returns the instance's IAM handle. 1027func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle { 1028 return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID) 1029} 1030 1031// Routing policies. 1032const ( 1033 // MultiClusterRouting is a policy that allows read/write requests to be 1034 // routed to any cluster in the instance. Requests will will fail over to 1035 // another cluster in the event of transient errors or delays. Choosing 1036 // this option sacrifices read-your-writes consistency to improve 1037 // availability. 1038 MultiClusterRouting = "multi_cluster_routing_use_any" 1039 // SingleClusterRouting is a policy that unconditionally routes all 1040 // read/write requests to a specific cluster. This option preserves 1041 // read-your-writes consistency, but does not improve availability. 1042 SingleClusterRouting = "single_cluster_routing" 1043) 1044 1045// ProfileConf contains the information necessary to create an profile 1046type ProfileConf struct { 1047 Name string 1048 ProfileID string 1049 InstanceID string 1050 Etag string 1051 Description string 1052 RoutingPolicy string 1053 ClusterID string 1054 AllowTransactionalWrites bool 1055 1056 // If true, warnings are ignored 1057 IgnoreWarnings bool 1058} 1059 1060// ProfileIterator iterates over profiles. 1061type ProfileIterator struct { 1062 items []*btapb.AppProfile 1063 pageInfo *iterator.PageInfo 1064 nextFunc func() error 1065} 1066 1067// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced. 1068type ProfileAttrsToUpdate struct { 1069 // If set, updates the description. 1070 Description optional.String 1071 1072 //If set, updates the routing policy. 1073 RoutingPolicy optional.String 1074 1075 //If RoutingPolicy is updated to SingleClusterRouting, set these fields as well. 1076 ClusterID string 1077 AllowTransactionalWrites bool 1078 1079 // If true, warnings are ignored 1080 IgnoreWarnings bool 1081} 1082 1083// GetFieldMaskPath returns the field mask path. 1084func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string { 1085 path := make([]string, 0) 1086 if p.Description != nil { 1087 path = append(path, "description") 1088 } 1089 1090 if p.RoutingPolicy != nil { 1091 path = append(path, optional.ToString(p.RoutingPolicy)) 1092 } 1093 return path 1094} 1095 1096// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 1097func (it *ProfileIterator) PageInfo() *iterator.PageInfo { 1098 return it.pageInfo 1099} 1100 1101// Next returns the next result. Its second return value is iterator.Done 1102// (https://godoc.org/google.golang.org/api/iterator) if there are no more 1103// results. Once Next returns Done, all subsequent calls will return Done. 1104func (it *ProfileIterator) Next() (*btapb.AppProfile, error) { 1105 if err := it.nextFunc(); err != nil { 1106 return nil, err 1107 } 1108 item := it.items[0] 1109 it.items = it.items[1:] 1110 return item, nil 1111} 1112 1113// CreateAppProfile creates an app profile within an instance. 1114func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) { 1115 ctx = mergeOutgoingMetadata(ctx, iac.md) 1116 parent := "projects/" + iac.project + "/instances/" + profile.InstanceID 1117 appProfile := &btapb.AppProfile{ 1118 Etag: profile.Etag, 1119 Description: profile.Description, 1120 } 1121 1122 if profile.RoutingPolicy == "" { 1123 return nil, errors.New("invalid routing policy") 1124 } 1125 1126 switch profile.RoutingPolicy { 1127 case MultiClusterRouting: 1128 appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1129 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1130 } 1131 case SingleClusterRouting: 1132 appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1133 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1134 ClusterId: profile.ClusterID, 1135 AllowTransactionalWrites: profile.AllowTransactionalWrites, 1136 }, 1137 } 1138 default: 1139 return nil, errors.New("invalid routing policy") 1140 } 1141 1142 return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{ 1143 Parent: parent, 1144 AppProfile: appProfile, 1145 AppProfileId: profile.ProfileID, 1146 IgnoreWarnings: profile.IgnoreWarnings, 1147 }) 1148} 1149 1150// GetAppProfile gets information about an app profile. 1151func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) { 1152 ctx = mergeOutgoingMetadata(ctx, iac.md) 1153 profileRequest := &btapb.GetAppProfileRequest{ 1154 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1155 } 1156 var ap *btapb.AppProfile 1157 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1158 var err error 1159 ap, err = iac.iClient.GetAppProfile(ctx, profileRequest) 1160 return err 1161 }, retryOptions...) 1162 if err != nil { 1163 return nil, err 1164 } 1165 return ap, err 1166} 1167 1168// ListAppProfiles lists information about app profiles in an instance. 1169func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator { 1170 ctx = mergeOutgoingMetadata(ctx, iac.md) 1171 listRequest := &btapb.ListAppProfilesRequest{ 1172 Parent: "projects/" + iac.project + "/instances/" + instanceID, 1173 } 1174 1175 pit := &ProfileIterator{} 1176 fetch := func(pageSize int, pageToken string) (string, error) { 1177 listRequest.PageToken = pageToken 1178 var profileRes *btapb.ListAppProfilesResponse 1179 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1180 var err error 1181 profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest) 1182 return err 1183 }, retryOptions...) 1184 if err != nil { 1185 return "", err 1186 } 1187 1188 pit.items = append(pit.items, profileRes.AppProfiles...) 1189 return profileRes.NextPageToken, nil 1190 } 1191 1192 bufLen := func() int { return len(pit.items) } 1193 takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b } 1194 pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 1195 return pit 1196 1197} 1198 1199// UpdateAppProfile updates an app profile within an instance. 1200// updateAttrs should be set. If unset, all fields will be replaced. 1201func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error { 1202 ctx = mergeOutgoingMetadata(ctx, iac.md) 1203 1204 profile := &btapb.AppProfile{ 1205 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID, 1206 } 1207 1208 if updateAttrs.Description != nil { 1209 profile.Description = optional.ToString(updateAttrs.Description) 1210 } 1211 if updateAttrs.RoutingPolicy != nil { 1212 switch optional.ToString(updateAttrs.RoutingPolicy) { 1213 case MultiClusterRouting: 1214 profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1215 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1216 } 1217 case SingleClusterRouting: 1218 profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1219 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1220 ClusterId: updateAttrs.ClusterID, 1221 AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites, 1222 }, 1223 } 1224 default: 1225 return errors.New("invalid routing policy") 1226 } 1227 } 1228 patchRequest := &btapb.UpdateAppProfileRequest{ 1229 AppProfile: profile, 1230 UpdateMask: &field_mask.FieldMask{ 1231 Paths: updateAttrs.GetFieldMaskPath(), 1232 }, 1233 IgnoreWarnings: updateAttrs.IgnoreWarnings, 1234 } 1235 updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest) 1236 if err != nil { 1237 return err 1238 } 1239 1240 return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil) 1241 1242} 1243 1244// DeleteAppProfile deletes an app profile from an instance. 1245func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error { 1246 ctx = mergeOutgoingMetadata(ctx, iac.md) 1247 deleteProfileRequest := &btapb.DeleteAppProfileRequest{ 1248 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1249 IgnoreWarnings: true, 1250 } 1251 _, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest) 1252 return err 1253 1254} 1255 1256// UpdateInstanceResults contains information about the 1257// changes made after invoking UpdateInstanceAndSyncClusters. 1258type UpdateInstanceResults struct { 1259 InstanceUpdated bool 1260 CreatedClusters []string 1261 DeletedClusters []string 1262 UpdatedClusters []string 1263} 1264 1265func (r *UpdateInstanceResults) String() string { 1266 return fmt.Sprintf("Instance updated? %v Clusters added:%v Clusters deleted:%v Clusters updated:%v", 1267 r.InstanceUpdated, r.CreatedClusters, r.DeletedClusters, r.UpdatedClusters) 1268} 1269 1270func max(x, y int) int { 1271 if x > y { 1272 return x 1273 } 1274 return y 1275} 1276 1277// UpdateInstanceAndSyncClusters updates an instance and its clusters, and will synchronize the 1278// clusters in the instance with the provided clusters, creating and deleting them as necessary. 1279// The provided InstanceWithClustersConfig is used as follows: 1280// - InstanceID is required 1281// - DisplayName and InstanceType are updated only if they are not empty 1282// - ClusterID is required for any provided cluster 1283// - Any cluster present in conf.Clusters but not part of the instance will be created using CreateCluster 1284// and the given ClusterConfig. 1285// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance 1286// using DeleteCluster. 1287// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the 1288// provided number of nodes if set. 1289// 1290// This method may return an error after partially succeeding, for example if the instance is updated 1291// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to 1292// determine the current state. The return UpdateInstanceResults will describe the work done by the 1293// method, whether partial or complete. 1294func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient, conf *InstanceWithClustersConfig) (*UpdateInstanceResults, error) { 1295 ctx = mergeOutgoingMetadata(ctx, iac.md) 1296 1297 // First fetch the existing clusters so we know what to remove, add or update. 1298 existingClusters, err := iac.Clusters(ctx, conf.InstanceID) 1299 if err != nil { 1300 return nil, err 1301 } 1302 1303 updatedInstance, err := iac.updateInstance(ctx, conf) 1304 if err != nil { 1305 return nil, err 1306 } 1307 1308 results := &UpdateInstanceResults{InstanceUpdated: updatedInstance} 1309 1310 existingClusterNames := make(map[string]bool) 1311 for _, cluster := range existingClusters { 1312 existingClusterNames[cluster.Name] = true 1313 } 1314 1315 // Synchronize clusters that were passed in with the existing clusters in the instance. 1316 // First update any cluster we encounter that already exists in the instance. 1317 // Collect the clusters that we will create and delete so that we can minimize disruption 1318 // of the instance. 1319 clustersToCreate := list.New() 1320 clustersToDelete := list.New() 1321 for _, cluster := range conf.Clusters { 1322 _, clusterExists := existingClusterNames[cluster.ClusterID] 1323 if !clusterExists { 1324 // The cluster doesn't exist yet, so we must create it. 1325 clustersToCreate.PushBack(cluster) 1326 continue 1327 } 1328 delete(existingClusterNames, cluster.ClusterID) 1329 1330 if cluster.NumNodes <= 0 { 1331 // We only synchronize clusters with a valid number of nodes. 1332 continue 1333 } 1334 1335 // We simply want to update this cluster 1336 err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes) 1337 if err != nil { 1338 return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v", 1339 cluster.ClusterID, err, results) 1340 } 1341 results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID) 1342 } 1343 1344 // Any cluster left in existingClusterNames was NOT in the given config and should be deleted. 1345 for clusterToDelete := range existingClusterNames { 1346 clustersToDelete.PushBack(clusterToDelete) 1347 } 1348 1349 // Now that we have the clusters that we need to create and delete, we do so keeping the following 1350 // in mind: 1351 // - Don't delete the last cluster in the instance, as that will result in an error. 1352 // - Attempt to offset each deletion with a creation before another deletion, so that instance 1353 // capacity is never reduced more than necessary. 1354 // Note that there is a limit on number of clusters in an instance which we are not aware of here, 1355 // so delete a cluster before adding one (as long as there are > 1 clusters left) so that we are 1356 // less likely to exceed the maximum number of clusters. 1357 numExistingClusters := len(existingClusters) 1358 nextCreation := clustersToCreate.Front() 1359 nextDeletion := clustersToDelete.Front() 1360 for { 1361 // We are done when both lists are empty. 1362 if nextCreation == nil && nextDeletion == nil { 1363 break 1364 } 1365 1366 // If there is more than one existing cluster, we always want to delete first if possible. 1367 // If there are no more creations left, always go ahead with the deletion. 1368 if (numExistingClusters > 1 && nextDeletion != nil) || nextCreation == nil { 1369 clusterToDelete := nextDeletion.Value.(string) 1370 err = iac.DeleteCluster(ctx, conf.InstanceID, clusterToDelete) 1371 if err != nil { 1372 return results, fmt.Errorf("DeleteCluster %q failed %v; Progress: %v", 1373 clusterToDelete, err, results) 1374 } 1375 results.DeletedClusters = append(results.DeletedClusters, clusterToDelete) 1376 numExistingClusters-- 1377 nextDeletion = nextDeletion.Next() 1378 } 1379 1380 // Now create a new cluster if required. 1381 if nextCreation != nil { 1382 clusterToCreate := nextCreation.Value.(ClusterConfig) 1383 // Assume the cluster config is well formed and rely on the underlying call to error out. 1384 // Make sure to set the InstanceID, though, since we know what it must be. 1385 clusterToCreate.InstanceID = conf.InstanceID 1386 err = iac.CreateCluster(ctx, &clusterToCreate) 1387 if err != nil { 1388 return results, fmt.Errorf("CreateCluster %v failed %v; Progress: %v", 1389 clusterToCreate, err, results) 1390 } 1391 results.CreatedClusters = append(results.CreatedClusters, clusterToCreate.ClusterID) 1392 numExistingClusters++ 1393 nextCreation = nextCreation.Next() 1394 } 1395 } 1396 1397 return results, nil 1398} 1399