1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "container/list" 21 "context" 22 "errors" 23 "fmt" 24 "math" 25 "regexp" 26 "strings" 27 "time" 28 29 btopt "cloud.google.com/go/bigtable/internal/option" 30 "cloud.google.com/go/iam" 31 "cloud.google.com/go/internal/optional" 32 "cloud.google.com/go/longrunning" 33 lroauto "cloud.google.com/go/longrunning/autogen" 34 "github.com/golang/protobuf/ptypes" 35 durpb "github.com/golang/protobuf/ptypes/duration" 36 gax "github.com/googleapis/gax-go/v2" 37 "google.golang.org/api/cloudresourcemanager/v1" 38 "google.golang.org/api/iterator" 39 "google.golang.org/api/option" 40 gtransport "google.golang.org/api/transport/grpc" 41 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 42 "google.golang.org/genproto/protobuf/field_mask" 43 "google.golang.org/grpc/codes" 44 "google.golang.org/grpc/metadata" 45 "google.golang.org/grpc/status" 46) 47 48const adminAddr = "bigtableadmin.googleapis.com:443" 49 50// AdminClient is a client type for performing admin operations within a specific instance. 51type AdminClient struct { 52 connPool gtransport.ConnPool 53 tClient btapb.BigtableTableAdminClient 54 lroClient *lroauto.OperationsClient 55 56 project, instance string 57 58 // Metadata to be sent with each request. 59 md metadata.MD 60} 61 62// NewAdminClient creates a new AdminClient for a given project and instance. 63func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) { 64 o, err := btopt.DefaultClientOptions(adminAddr, AdminScope, clientUserAgent) 65 if err != nil { 66 return nil, err 67 } 68 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 69 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 70 // Need to add scopes for long running operations (for create table & snapshots) 71 o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope)) 72 o = append(o, opts...) 73 connPool, err := gtransport.DialPool(ctx, o...) 74 if err != nil { 75 return nil, fmt.Errorf("dialing: %v", err) 76 } 77 78 lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool)) 79 if err != nil { 80 // This error "should not happen", since we are just reusing old connection 81 // and never actually need to dial. 82 // If this does happen, we could leak conn. However, we cannot close conn: 83 // If the user invoked the function with option.WithGRPCConn, 84 // we would close a connection that's still in use. 85 // TODO(pongad): investigate error conditions. 86 return nil, err 87 } 88 89 return &AdminClient{ 90 connPool: connPool, 91 tClient: btapb.NewBigtableTableAdminClient(connPool), 92 lroClient: lroClient, 93 project: project, 94 instance: instance, 95 md: metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)), 96 }, nil 97} 98 99// Close closes the AdminClient. 100func (ac *AdminClient) Close() error { 101 return ac.connPool.Close() 102} 103 104func (ac *AdminClient) instancePrefix() string { 105 return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance) 106} 107 108// Tables returns a list of the tables in the instance. 109func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { 110 ctx = mergeOutgoingMetadata(ctx, ac.md) 111 prefix := ac.instancePrefix() 112 req := &btapb.ListTablesRequest{ 113 Parent: prefix, 114 } 115 116 var res *btapb.ListTablesResponse 117 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 118 var err error 119 res, err = ac.tClient.ListTables(ctx, req) 120 return err 121 }, retryOptions...) 122 if err != nil { 123 return nil, err 124 } 125 126 names := make([]string, 0, len(res.Tables)) 127 for _, tbl := range res.Tables { 128 names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/")) 129 } 130 return names, nil 131} 132 133// TableConf contains all of the information necessary to create a table with column families. 134type TableConf struct { 135 TableID string 136 SplitKeys []string 137 // Families is a map from family name to GCPolicy 138 Families map[string]GCPolicy 139} 140 141// CreateTable creates a new table in the instance. 142// This method may return before the table's creation is complete. 143func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { 144 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table}) 145} 146 147// CreatePresplitTable creates a new table in the instance. 148// The list of row keys will be used to initially split the table into multiple tablets. 149// Given two split keys, "s1" and "s2", three tablets will be created, 150// spanning the key ranges: [, s1), [s1, s2), [s2, ). 151// This method may return before the table's creation is complete. 152func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error { 153 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys}) 154} 155 156// CreateTableFromConf creates a new table in the instance from the given configuration. 157func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error { 158 ctx = mergeOutgoingMetadata(ctx, ac.md) 159 var reqSplits []*btapb.CreateTableRequest_Split 160 for _, split := range conf.SplitKeys { 161 reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)}) 162 } 163 var tbl btapb.Table 164 if conf.Families != nil { 165 tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) 166 for fam, policy := range conf.Families { 167 tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()} 168 } 169 } 170 prefix := ac.instancePrefix() 171 req := &btapb.CreateTableRequest{ 172 Parent: prefix, 173 TableId: conf.TableID, 174 Table: &tbl, 175 InitialSplits: reqSplits, 176 } 177 _, err := ac.tClient.CreateTable(ctx, req) 178 return err 179} 180 181// CreateColumnFamily creates a new column family in a table. 182func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error { 183 // TODO(dsymonds): Permit specifying gcexpr and any other family settings. 184 ctx = mergeOutgoingMetadata(ctx, ac.md) 185 prefix := ac.instancePrefix() 186 req := &btapb.ModifyColumnFamiliesRequest{ 187 Name: prefix + "/tables/" + table, 188 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 189 Id: family, 190 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, 191 }}, 192 } 193 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 194 return err 195} 196 197// DeleteTable deletes a table and all of its data. 198func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error { 199 ctx = mergeOutgoingMetadata(ctx, ac.md) 200 prefix := ac.instancePrefix() 201 req := &btapb.DeleteTableRequest{ 202 Name: prefix + "/tables/" + table, 203 } 204 _, err := ac.tClient.DeleteTable(ctx, req) 205 return err 206} 207 208// DeleteColumnFamily deletes a column family in a table and all of its data. 209func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error { 210 ctx = mergeOutgoingMetadata(ctx, ac.md) 211 prefix := ac.instancePrefix() 212 req := &btapb.ModifyColumnFamiliesRequest{ 213 Name: prefix + "/tables/" + table, 214 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 215 Id: family, 216 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true}, 217 }}, 218 } 219 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 220 return err 221} 222 223// TableInfo represents information about a table. 224type TableInfo struct { 225 // DEPRECATED - This field is deprecated. Please use FamilyInfos instead. 226 Families []string 227 FamilyInfos []FamilyInfo 228} 229 230// FamilyInfo represents information about a column family. 231type FamilyInfo struct { 232 Name string 233 GCPolicy string 234} 235 236// TableInfo retrieves information about a table. 237func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) { 238 ctx = mergeOutgoingMetadata(ctx, ac.md) 239 prefix := ac.instancePrefix() 240 req := &btapb.GetTableRequest{ 241 Name: prefix + "/tables/" + table, 242 } 243 244 var res *btapb.Table 245 246 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 247 var err error 248 res, err = ac.tClient.GetTable(ctx, req) 249 return err 250 }, retryOptions...) 251 if err != nil { 252 return nil, err 253 } 254 255 ti := &TableInfo{} 256 for name, fam := range res.ColumnFamilies { 257 ti.Families = append(ti.Families, name) 258 ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)}) 259 } 260 return ti, nil 261} 262 263// SetGCPolicy specifies which cells in a column family should be garbage collected. 264// GC executes opportunistically in the background; table reads may return data 265// matching the GC policy. 266func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { 267 ctx = mergeOutgoingMetadata(ctx, ac.md) 268 prefix := ac.instancePrefix() 269 req := &btapb.ModifyColumnFamiliesRequest{ 270 Name: prefix + "/tables/" + table, 271 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 272 Id: family, 273 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}}, 274 }}, 275 } 276 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 277 return err 278} 279 280// DropRowRange permanently deletes a row range from the specified table. 281func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error { 282 ctx = mergeOutgoingMetadata(ctx, ac.md) 283 prefix := ac.instancePrefix() 284 req := &btapb.DropRowRangeRequest{ 285 Name: prefix + "/tables/" + table, 286 Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)}, 287 } 288 _, err := ac.tClient.DropRowRange(ctx, req) 289 return err 290} 291 292// DropAllRows permanently deletes all rows from the specified table. 293func (ac *AdminClient) DropAllRows(ctx context.Context, table string) error { 294 ctx = mergeOutgoingMetadata(ctx, ac.md) 295 prefix := ac.instancePrefix() 296 req := &btapb.DropRowRangeRequest{ 297 Name: prefix + "/tables/" + table, 298 Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true}, 299 } 300 _, err := ac.tClient.DropRowRange(ctx, req) 301 return err 302} 303 304// CreateTableFromSnapshot creates a table from snapshot. 305// The table will be created in the same cluster as the snapshot. 306// 307// This is a private alpha release of Cloud Bigtable snapshots. This feature 308// is not currently available to most Cloud Bigtable customers. This feature 309// might be changed in backward-incompatible ways and is not recommended for 310// production use. It is not subject to any SLA or deprecation policy. 311func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error { 312 ctx = mergeOutgoingMetadata(ctx, ac.md) 313 prefix := ac.instancePrefix() 314 snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot 315 316 req := &btapb.CreateTableFromSnapshotRequest{ 317 Parent: prefix, 318 TableId: table, 319 SourceSnapshot: snapshotPath, 320 } 321 op, err := ac.tClient.CreateTableFromSnapshot(ctx, req) 322 if err != nil { 323 return err 324 } 325 resp := btapb.Table{} 326 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 327} 328 329// DefaultSnapshotDuration is the default TTL for a snapshot. 330const DefaultSnapshotDuration time.Duration = 0 331 332// SnapshotTable creates a new snapshot in the specified cluster from the 333// specified source table. Setting the TTL to `DefaultSnapshotDuration` will 334// use the server side default for the duration. 335// 336// This is a private alpha release of Cloud Bigtable snapshots. This feature 337// is not currently available to most Cloud Bigtable customers. This feature 338// might be changed in backward-incompatible ways and is not recommended for 339// production use. It is not subject to any SLA or deprecation policy. 340func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error { 341 ctx = mergeOutgoingMetadata(ctx, ac.md) 342 prefix := ac.instancePrefix() 343 344 var ttlProto *durpb.Duration 345 346 if ttl > 0 { 347 ttlProto = ptypes.DurationProto(ttl) 348 } 349 350 req := &btapb.SnapshotTableRequest{ 351 Name: prefix + "/tables/" + table, 352 Cluster: prefix + "/clusters/" + cluster, 353 SnapshotId: snapshot, 354 Ttl: ttlProto, 355 } 356 357 op, err := ac.tClient.SnapshotTable(ctx, req) 358 if err != nil { 359 return err 360 } 361 resp := btapb.Snapshot{} 362 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 363} 364 365// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster. 366// To list snapshots across all of the clusters in the instance specify "-" as the cluster. 367// 368// This is a private alpha release of Cloud Bigtable snapshots. This feature is not 369// currently available to most Cloud Bigtable customers. This feature might be 370// changed in backward-incompatible ways and is not recommended for production use. 371// It is not subject to any SLA or deprecation policy. 372func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator { 373 ctx = mergeOutgoingMetadata(ctx, ac.md) 374 prefix := ac.instancePrefix() 375 clusterPath := prefix + "/clusters/" + cluster 376 377 it := &SnapshotIterator{} 378 req := &btapb.ListSnapshotsRequest{ 379 Parent: clusterPath, 380 } 381 382 fetch := func(pageSize int, pageToken string) (string, error) { 383 req.PageToken = pageToken 384 if pageSize > math.MaxInt32 { 385 req.PageSize = math.MaxInt32 386 } else { 387 req.PageSize = int32(pageSize) 388 } 389 390 var resp *btapb.ListSnapshotsResponse 391 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 392 var err error 393 resp, err = ac.tClient.ListSnapshots(ctx, req) 394 return err 395 }, retryOptions...) 396 if err != nil { 397 return "", err 398 } 399 for _, s := range resp.Snapshots { 400 snapshotInfo, err := newSnapshotInfo(s) 401 if err != nil { 402 return "", fmt.Errorf("failed to parse snapshot proto %v", err) 403 } 404 it.items = append(it.items, snapshotInfo) 405 } 406 return resp.NextPageToken, nil 407 } 408 bufLen := func() int { return len(it.items) } 409 takeBuf := func() interface{} { b := it.items; it.items = nil; return b } 410 411 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 412 413 return it 414} 415 416func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) { 417 nameParts := strings.Split(snapshot.Name, "/") 418 name := nameParts[len(nameParts)-1] 419 tablePathParts := strings.Split(snapshot.SourceTable.Name, "/") 420 tableID := tablePathParts[len(tablePathParts)-1] 421 422 createTime, err := ptypes.Timestamp(snapshot.CreateTime) 423 if err != nil { 424 return nil, fmt.Errorf("invalid createTime: %v", err) 425 } 426 427 deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime) 428 if err != nil { 429 return nil, fmt.Errorf("invalid deleteTime: %v", err) 430 } 431 432 return &SnapshotInfo{ 433 Name: name, 434 SourceTable: tableID, 435 DataSize: snapshot.DataSizeBytes, 436 CreateTime: createTime, 437 DeleteTime: deleteTime, 438 }, nil 439} 440 441// SnapshotIterator is an EntryIterator that iterates over log entries. 442// 443// This is a private alpha release of Cloud Bigtable snapshots. This feature 444// is not currently available to most Cloud Bigtable customers. This feature 445// might be changed in backward-incompatible ways and is not recommended for 446// production use. It is not subject to any SLA or deprecation policy. 447type SnapshotIterator struct { 448 items []*SnapshotInfo 449 pageInfo *iterator.PageInfo 450 nextFunc func() error 451} 452 453// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 454func (it *SnapshotIterator) PageInfo() *iterator.PageInfo { 455 return it.pageInfo 456} 457 458// Next returns the next result. Its second return value is iterator.Done 459// (https://godoc.org/google.golang.org/api/iterator) if there are no more 460// results. Once Next returns Done, all subsequent calls will return Done. 461func (it *SnapshotIterator) Next() (*SnapshotInfo, error) { 462 if err := it.nextFunc(); err != nil { 463 return nil, err 464 } 465 item := it.items[0] 466 it.items = it.items[1:] 467 return item, nil 468} 469 470// SnapshotInfo contains snapshot metadata. 471type SnapshotInfo struct { 472 Name string 473 SourceTable string 474 DataSize int64 475 CreateTime time.Time 476 DeleteTime time.Time 477} 478 479// SnapshotInfo gets snapshot metadata. 480// 481// This is a private alpha release of Cloud Bigtable snapshots. This feature 482// is not currently available to most Cloud Bigtable customers. This feature 483// might be changed in backward-incompatible ways and is not recommended for 484// production use. It is not subject to any SLA or deprecation policy. 485func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) { 486 ctx = mergeOutgoingMetadata(ctx, ac.md) 487 prefix := ac.instancePrefix() 488 clusterPath := prefix + "/clusters/" + cluster 489 snapshotPath := clusterPath + "/snapshots/" + snapshot 490 491 req := &btapb.GetSnapshotRequest{ 492 Name: snapshotPath, 493 } 494 495 var resp *btapb.Snapshot 496 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 497 var err error 498 resp, err = ac.tClient.GetSnapshot(ctx, req) 499 return err 500 }, retryOptions...) 501 if err != nil { 502 return nil, err 503 } 504 505 return newSnapshotInfo(resp) 506} 507 508// DeleteSnapshot deletes a snapshot in a cluster. 509// 510// This is a private alpha release of Cloud Bigtable snapshots. This feature 511// is not currently available to most Cloud Bigtable customers. This feature 512// might be changed in backward-incompatible ways and is not recommended for 513// production use. It is not subject to any SLA or deprecation policy. 514func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error { 515 ctx = mergeOutgoingMetadata(ctx, ac.md) 516 prefix := ac.instancePrefix() 517 clusterPath := prefix + "/clusters/" + cluster 518 snapshotPath := clusterPath + "/snapshots/" + snapshot 519 520 req := &btapb.DeleteSnapshotRequest{ 521 Name: snapshotPath, 522 } 523 _, err := ac.tClient.DeleteSnapshot(ctx, req) 524 return err 525} 526 527// getConsistencyToken gets the consistency token for a table. 528func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) { 529 req := &btapb.GenerateConsistencyTokenRequest{ 530 Name: tableName, 531 } 532 resp, err := ac.tClient.GenerateConsistencyToken(ctx, req) 533 if err != nil { 534 return "", err 535 } 536 return resp.GetConsistencyToken(), nil 537} 538 539// isConsistent checks if a token is consistent for a table. 540func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) { 541 req := &btapb.CheckConsistencyRequest{ 542 Name: tableName, 543 ConsistencyToken: token, 544 } 545 var resp *btapb.CheckConsistencyResponse 546 547 // Retry calls on retryable errors to avoid losing the token gathered before. 548 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 549 var err error 550 resp, err = ac.tClient.CheckConsistency(ctx, req) 551 return err 552 }, retryOptions...) 553 if err != nil { 554 return false, err 555 } 556 return resp.GetConsistent(), nil 557} 558 559// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication. 560func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error { 561 ctx = mergeOutgoingMetadata(ctx, ac.md) 562 // Get the token. 563 prefix := ac.instancePrefix() 564 tableName := prefix + "/tables/" + table 565 token, err := ac.getConsistencyToken(ctx, tableName) 566 if err != nil { 567 return err 568 } 569 570 // Periodically check if the token is consistent. 571 timer := time.NewTicker(time.Second * 10) 572 defer timer.Stop() 573 for { 574 consistent, err := ac.isConsistent(ctx, tableName, token) 575 if err != nil { 576 return err 577 } 578 if consistent { 579 return nil 580 } 581 // Sleep for a bit or until the ctx is cancelled. 582 select { 583 case <-ctx.Done(): 584 return ctx.Err() 585 case <-timer.C: 586 } 587 } 588} 589 590// TableIAM creates an IAM client specific to a given Instance and Table within the configured project. 591func (ac *AdminClient) TableIAM(tableID string) *iam.Handle { 592 return iam.InternalNewHandleGRPCClient(ac.tClient, 593 "projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID) 594} 595 596const instanceAdminAddr = "bigtableadmin.googleapis.com:443" 597 598// InstanceAdminClient is a client type for performing admin operations on instances. 599// These operations can be substantially more dangerous than those provided by AdminClient. 600type InstanceAdminClient struct { 601 connPool gtransport.ConnPool 602 iClient btapb.BigtableInstanceAdminClient 603 lroClient *lroauto.OperationsClient 604 605 project string 606 607 // Metadata to be sent with each request. 608 md metadata.MD 609} 610 611// NewInstanceAdminClient creates a new InstanceAdminClient for a given project. 612func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) { 613 o, err := btopt.DefaultClientOptions(instanceAdminAddr, InstanceAdminScope, clientUserAgent) 614 if err != nil { 615 return nil, err 616 } 617 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 618 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 619 o = append(o, opts...) 620 connPool, err := gtransport.DialPool(ctx, o...) 621 if err != nil { 622 return nil, fmt.Errorf("dialing: %v", err) 623 } 624 625 lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool)) 626 if err != nil { 627 // This error "should not happen", since we are just reusing old connection 628 // and never actually need to dial. 629 // If this does happen, we could leak conn. However, we cannot close conn: 630 // If the user invoked the function with option.WithGRPCConn, 631 // we would close a connection that's still in use. 632 // TODO(pongad): investigate error conditions. 633 return nil, err 634 } 635 636 return &InstanceAdminClient{ 637 connPool: connPool, 638 iClient: btapb.NewBigtableInstanceAdminClient(connPool), 639 lroClient: lroClient, 640 641 project: project, 642 md: metadata.Pairs(resourcePrefixHeader, "projects/"+project), 643 }, nil 644} 645 646// Close closes the InstanceAdminClient. 647func (iac *InstanceAdminClient) Close() error { 648 return iac.connPool.Close() 649} 650 651// StorageType is the type of storage used for all tables in an instance 652type StorageType int 653 654const ( 655 SSD StorageType = iota 656 HDD 657) 658 659func (st StorageType) proto() btapb.StorageType { 660 if st == HDD { 661 return btapb.StorageType_HDD 662 } 663 return btapb.StorageType_SSD 664} 665 666func storageTypeFromProto(st btapb.StorageType) StorageType { 667 if st == btapb.StorageType_HDD { 668 return HDD 669 } 670 671 return SSD 672} 673 674// InstanceType is the type of the instance 675type InstanceType int32 676 677const ( 678 // UNSPECIFIED instance types default to PRODUCTION 679 UNSPECIFIED InstanceType = InstanceType(btapb.Instance_TYPE_UNSPECIFIED) 680 PRODUCTION = InstanceType(btapb.Instance_PRODUCTION) 681 DEVELOPMENT = InstanceType(btapb.Instance_DEVELOPMENT) 682) 683 684// InstanceInfo represents information about an instance 685type InstanceInfo struct { 686 Name string // name of the instance 687 DisplayName string // display name for UIs 688 InstanceType InstanceType 689} 690 691// InstanceConf contains the information necessary to create an Instance 692type InstanceConf struct { 693 InstanceId, DisplayName, ClusterId, Zone string 694 // NumNodes must not be specified for DEVELOPMENT instance types 695 NumNodes int32 696 StorageType StorageType 697 InstanceType InstanceType 698} 699 700// InstanceWithClustersConfig contains the information necessary to create an Instance 701type InstanceWithClustersConfig struct { 702 InstanceID, DisplayName string 703 Clusters []ClusterConfig 704 InstanceType InstanceType 705} 706 707var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`) 708 709// CreateInstance creates a new instance in the project. 710// This method will return when the instance has been created or when an error occurs. 711func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error { 712 ctx = mergeOutgoingMetadata(ctx, iac.md) 713 newConfig := InstanceWithClustersConfig{ 714 InstanceID: conf.InstanceId, 715 DisplayName: conf.DisplayName, 716 InstanceType: conf.InstanceType, 717 Clusters: []ClusterConfig{ 718 { 719 InstanceID: conf.InstanceId, 720 ClusterID: conf.ClusterId, 721 Zone: conf.Zone, 722 NumNodes: conf.NumNodes, 723 StorageType: conf.StorageType, 724 }, 725 }, 726 } 727 return iac.CreateInstanceWithClusters(ctx, &newConfig) 728} 729 730// CreateInstanceWithClusters creates a new instance with configured clusters in the project. 731// This method will return when the instance has been created or when an error occurs. 732func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 733 ctx = mergeOutgoingMetadata(ctx, iac.md) 734 clusters := make(map[string]*btapb.Cluster) 735 for _, cluster := range conf.Clusters { 736 clusters[cluster.ClusterID] = cluster.proto(iac.project) 737 } 738 739 req := &btapb.CreateInstanceRequest{ 740 Parent: "projects/" + iac.project, 741 InstanceId: conf.InstanceID, 742 Instance: &btapb.Instance{DisplayName: conf.DisplayName, Type: btapb.Instance_Type(conf.InstanceType)}, 743 Clusters: clusters, 744 } 745 746 lro, err := iac.iClient.CreateInstance(ctx, req) 747 if err != nil { 748 return err 749 } 750 resp := btapb.Instance{} 751 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 752} 753 754// updateInstance updates a single instance based on config fields that operate 755// at an instance level: DisplayName and InstanceType. 756func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *InstanceWithClustersConfig) (updated bool, err error) { 757 if conf.InstanceID == "" { 758 return false, errors.New("InstanceID is required") 759 } 760 761 // Update the instance, if necessary 762 mask := &field_mask.FieldMask{} 763 ireq := &btapb.PartialUpdateInstanceRequest{ 764 Instance: &btapb.Instance{ 765 Name: "projects/" + iac.project + "/instances/" + conf.InstanceID, 766 }, 767 UpdateMask: mask, 768 } 769 if conf.DisplayName != "" { 770 ireq.Instance.DisplayName = conf.DisplayName 771 mask.Paths = append(mask.Paths, "display_name") 772 } 773 if btapb.Instance_Type(conf.InstanceType) != btapb.Instance_TYPE_UNSPECIFIED { 774 ireq.Instance.Type = btapb.Instance_Type(conf.InstanceType) 775 mask.Paths = append(mask.Paths, "type") 776 } 777 778 if len(mask.Paths) == 0 { 779 return false, nil 780 } 781 782 lro, err := iac.iClient.PartialUpdateInstance(ctx, ireq) 783 if err != nil { 784 return false, err 785 } 786 err = longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 787 if err != nil { 788 return false, err 789 } 790 791 return true, nil 792} 793 794// UpdateInstanceWithClusters updates an instance and its clusters. Updateable 795// fields are instance display name, instance type and cluster size. 796// The provided InstanceWithClustersConfig is used as follows: 797// - InstanceID is required 798// - DisplayName and InstanceType are updated only if they are not empty 799// - ClusterID is required for any provided cluster 800// - All other cluster fields are ignored except for NumNodes, which if set will be updated 801// 802// This method may return an error after partially succeeding, for example if the instance is updated 803// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to 804// determine the current state. 805func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 806 ctx = mergeOutgoingMetadata(ctx, iac.md) 807 808 for _, cluster := range conf.Clusters { 809 if cluster.ClusterID == "" { 810 return errors.New("ClusterID is required for every cluster") 811 } 812 } 813 814 updatedInstance, err := iac.updateInstance(ctx, conf) 815 if err != nil { 816 return err 817 } 818 819 // Update any clusters 820 for _, cluster := range conf.Clusters { 821 err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes) 822 if err != nil { 823 if updatedInstance { 824 // We updated the instance, so note that in the error message. 825 return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded", 826 cluster.ClusterID, err) 827 } 828 return err 829 } 830 } 831 832 return nil 833} 834 835// DeleteInstance deletes an instance from the project. 836func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error { 837 ctx = mergeOutgoingMetadata(ctx, iac.md) 838 req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID} 839 _, err := iac.iClient.DeleteInstance(ctx, req) 840 return err 841} 842 843// Instances returns a list of instances in the project. 844func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) { 845 ctx = mergeOutgoingMetadata(ctx, iac.md) 846 req := &btapb.ListInstancesRequest{ 847 Parent: "projects/" + iac.project, 848 } 849 var res *btapb.ListInstancesResponse 850 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 851 var err error 852 res, err = iac.iClient.ListInstances(ctx, req) 853 return err 854 }, retryOptions...) 855 if err != nil { 856 return nil, err 857 } 858 if len(res.FailedLocations) > 0 { 859 // We don't have a good way to return a partial result in the face of some zones being unavailable. 860 // Fail the entire request. 861 return nil, status.Errorf(codes.Unavailable, "Failed locations: %v", res.FailedLocations) 862 } 863 864 var is []*InstanceInfo 865 for _, i := range res.Instances { 866 m := instanceNameRegexp.FindStringSubmatch(i.Name) 867 if m == nil { 868 return nil, fmt.Errorf("malformed instance name %q", i.Name) 869 } 870 is = append(is, &InstanceInfo{ 871 Name: m[2], 872 DisplayName: i.DisplayName, 873 InstanceType: InstanceType(i.Type), 874 }) 875 } 876 return is, nil 877} 878 879// InstanceInfo returns information about an instance. 880func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) { 881 ctx = mergeOutgoingMetadata(ctx, iac.md) 882 req := &btapb.GetInstanceRequest{ 883 Name: "projects/" + iac.project + "/instances/" + instanceID, 884 } 885 var res *btapb.Instance 886 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 887 var err error 888 res, err = iac.iClient.GetInstance(ctx, req) 889 return err 890 }, retryOptions...) 891 if err != nil { 892 return nil, err 893 } 894 895 m := instanceNameRegexp.FindStringSubmatch(res.Name) 896 if m == nil { 897 return nil, fmt.Errorf("malformed instance name %q", res.Name) 898 } 899 return &InstanceInfo{ 900 Name: m[2], 901 DisplayName: res.DisplayName, 902 InstanceType: InstanceType(res.Type), 903 }, nil 904} 905 906// ClusterConfig contains the information necessary to create a cluster 907type ClusterConfig struct { 908 InstanceID, ClusterID, Zone string 909 NumNodes int32 910 StorageType StorageType 911} 912 913func (cc *ClusterConfig) proto(project string) *btapb.Cluster { 914 return &btapb.Cluster{ 915 ServeNodes: cc.NumNodes, 916 DefaultStorageType: cc.StorageType.proto(), 917 Location: "projects/" + project + "/locations/" + cc.Zone, 918 } 919} 920 921// ClusterInfo represents information about a cluster. 922type ClusterInfo struct { 923 Name string // name of the cluster 924 Zone string // GCP zone of the cluster (e.g. "us-central1-a") 925 ServeNodes int // number of allocated serve nodes 926 State string // state of the cluster 927 StorageType StorageType // the storage type of the cluster 928} 929 930// CreateCluster creates a new cluster in an instance. 931// This method will return when the cluster has been created or when an error occurs. 932func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error { 933 ctx = mergeOutgoingMetadata(ctx, iac.md) 934 935 req := &btapb.CreateClusterRequest{ 936 Parent: "projects/" + iac.project + "/instances/" + conf.InstanceID, 937 ClusterId: conf.ClusterID, 938 Cluster: conf.proto(iac.project), 939 } 940 941 lro, err := iac.iClient.CreateCluster(ctx, req) 942 if err != nil { 943 return err 944 } 945 resp := btapb.Cluster{} 946 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 947} 948 949// DeleteCluster deletes a cluster from an instance. 950func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error { 951 ctx = mergeOutgoingMetadata(ctx, iac.md) 952 req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 953 _, err := iac.iClient.DeleteCluster(ctx, req) 954 return err 955} 956 957// UpdateCluster updates attributes of a cluster 958func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error { 959 ctx = mergeOutgoingMetadata(ctx, iac.md) 960 cluster := &btapb.Cluster{ 961 Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID, 962 ServeNodes: serveNodes} 963 lro, err := iac.iClient.UpdateCluster(ctx, cluster) 964 if err != nil { 965 return err 966 } 967 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 968} 969 970// Clusters lists the clusters in an instance. 971func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) { 972 ctx = mergeOutgoingMetadata(ctx, iac.md) 973 req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID} 974 var res *btapb.ListClustersResponse 975 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 976 var err error 977 res, err = iac.iClient.ListClusters(ctx, req) 978 return err 979 }, retryOptions...) 980 if err != nil { 981 return nil, err 982 } 983 // TODO(garyelliott): Deal with failed_locations. 984 var cis []*ClusterInfo 985 for _, c := range res.Clusters { 986 nameParts := strings.Split(c.Name, "/") 987 locParts := strings.Split(c.Location, "/") 988 cis = append(cis, &ClusterInfo{ 989 Name: nameParts[len(nameParts)-1], 990 Zone: locParts[len(locParts)-1], 991 ServeNodes: int(c.ServeNodes), 992 State: c.State.String(), 993 StorageType: storageTypeFromProto(c.DefaultStorageType), 994 }) 995 } 996 return cis, nil 997} 998 999// GetCluster fetches a cluster in an instance 1000func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) { 1001 ctx = mergeOutgoingMetadata(ctx, iac.md) 1002 req := &btapb.GetClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 1003 var c *btapb.Cluster 1004 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1005 var err error 1006 c, err = iac.iClient.GetCluster(ctx, req) 1007 return err 1008 }, retryOptions...) 1009 if err != nil { 1010 return nil, err 1011 } 1012 1013 nameParts := strings.Split(c.Name, "/") 1014 locParts := strings.Split(c.Location, "/") 1015 cis := &ClusterInfo{ 1016 Name: nameParts[len(nameParts)-1], 1017 Zone: locParts[len(locParts)-1], 1018 ServeNodes: int(c.ServeNodes), 1019 State: c.State.String(), 1020 StorageType: storageTypeFromProto(c.DefaultStorageType), 1021 } 1022 return cis, nil 1023} 1024 1025// InstanceIAM returns the instance's IAM handle. 1026func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle { 1027 return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID) 1028} 1029 1030// Routing policies. 1031const ( 1032 // MultiClusterRouting is a policy that allows read/write requests to be 1033 // routed to any cluster in the instance. Requests will will fail over to 1034 // another cluster in the event of transient errors or delays. Choosing 1035 // this option sacrifices read-your-writes consistency to improve 1036 // availability. 1037 MultiClusterRouting = "multi_cluster_routing_use_any" 1038 // SingleClusterRouting is a policy that unconditionally routes all 1039 // read/write requests to a specific cluster. This option preserves 1040 // read-your-writes consistency, but does not improve availability. 1041 SingleClusterRouting = "single_cluster_routing" 1042) 1043 1044// ProfileConf contains the information necessary to create an profile 1045type ProfileConf struct { 1046 Name string 1047 ProfileID string 1048 InstanceID string 1049 Etag string 1050 Description string 1051 RoutingPolicy string 1052 ClusterID string 1053 AllowTransactionalWrites bool 1054 1055 // If true, warnings are ignored 1056 IgnoreWarnings bool 1057} 1058 1059// ProfileIterator iterates over profiles. 1060type ProfileIterator struct { 1061 items []*btapb.AppProfile 1062 pageInfo *iterator.PageInfo 1063 nextFunc func() error 1064} 1065 1066// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced. 1067type ProfileAttrsToUpdate struct { 1068 // If set, updates the description. 1069 Description optional.String 1070 1071 //If set, updates the routing policy. 1072 RoutingPolicy optional.String 1073 1074 //If RoutingPolicy is updated to SingleClusterRouting, set these fields as well. 1075 ClusterID string 1076 AllowTransactionalWrites bool 1077 1078 // If true, warnings are ignored 1079 IgnoreWarnings bool 1080} 1081 1082// GetFieldMaskPath returns the field mask path. 1083func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string { 1084 path := make([]string, 0) 1085 if p.Description != nil { 1086 path = append(path, "description") 1087 } 1088 1089 if p.RoutingPolicy != nil { 1090 path = append(path, optional.ToString(p.RoutingPolicy)) 1091 } 1092 return path 1093} 1094 1095// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 1096func (it *ProfileIterator) PageInfo() *iterator.PageInfo { 1097 return it.pageInfo 1098} 1099 1100// Next returns the next result. Its second return value is iterator.Done 1101// (https://godoc.org/google.golang.org/api/iterator) if there are no more 1102// results. Once Next returns Done, all subsequent calls will return Done. 1103func (it *ProfileIterator) Next() (*btapb.AppProfile, error) { 1104 if err := it.nextFunc(); err != nil { 1105 return nil, err 1106 } 1107 item := it.items[0] 1108 it.items = it.items[1:] 1109 return item, nil 1110} 1111 1112// CreateAppProfile creates an app profile within an instance. 1113func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) { 1114 ctx = mergeOutgoingMetadata(ctx, iac.md) 1115 parent := "projects/" + iac.project + "/instances/" + profile.InstanceID 1116 appProfile := &btapb.AppProfile{ 1117 Etag: profile.Etag, 1118 Description: profile.Description, 1119 } 1120 1121 if profile.RoutingPolicy == "" { 1122 return nil, errors.New("invalid routing policy") 1123 } 1124 1125 switch profile.RoutingPolicy { 1126 case MultiClusterRouting: 1127 appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1128 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1129 } 1130 case SingleClusterRouting: 1131 appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1132 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1133 ClusterId: profile.ClusterID, 1134 AllowTransactionalWrites: profile.AllowTransactionalWrites, 1135 }, 1136 } 1137 default: 1138 return nil, errors.New("invalid routing policy") 1139 } 1140 1141 return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{ 1142 Parent: parent, 1143 AppProfile: appProfile, 1144 AppProfileId: profile.ProfileID, 1145 IgnoreWarnings: profile.IgnoreWarnings, 1146 }) 1147} 1148 1149// GetAppProfile gets information about an app profile. 1150func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) { 1151 ctx = mergeOutgoingMetadata(ctx, iac.md) 1152 profileRequest := &btapb.GetAppProfileRequest{ 1153 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1154 } 1155 var ap *btapb.AppProfile 1156 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1157 var err error 1158 ap, err = iac.iClient.GetAppProfile(ctx, profileRequest) 1159 return err 1160 }, retryOptions...) 1161 if err != nil { 1162 return nil, err 1163 } 1164 return ap, err 1165} 1166 1167// ListAppProfiles lists information about app profiles in an instance. 1168func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator { 1169 ctx = mergeOutgoingMetadata(ctx, iac.md) 1170 listRequest := &btapb.ListAppProfilesRequest{ 1171 Parent: "projects/" + iac.project + "/instances/" + instanceID, 1172 } 1173 1174 pit := &ProfileIterator{} 1175 fetch := func(pageSize int, pageToken string) (string, error) { 1176 listRequest.PageToken = pageToken 1177 var profileRes *btapb.ListAppProfilesResponse 1178 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1179 var err error 1180 profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest) 1181 return err 1182 }, retryOptions...) 1183 if err != nil { 1184 return "", err 1185 } 1186 1187 pit.items = append(pit.items, profileRes.AppProfiles...) 1188 return profileRes.NextPageToken, nil 1189 } 1190 1191 bufLen := func() int { return len(pit.items) } 1192 takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b } 1193 pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 1194 return pit 1195 1196} 1197 1198// UpdateAppProfile updates an app profile within an instance. 1199// updateAttrs should be set. If unset, all fields will be replaced. 1200func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error { 1201 ctx = mergeOutgoingMetadata(ctx, iac.md) 1202 1203 profile := &btapb.AppProfile{ 1204 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID, 1205 } 1206 1207 if updateAttrs.Description != nil { 1208 profile.Description = optional.ToString(updateAttrs.Description) 1209 } 1210 if updateAttrs.RoutingPolicy != nil { 1211 switch optional.ToString(updateAttrs.RoutingPolicy) { 1212 case MultiClusterRouting: 1213 profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1214 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1215 } 1216 case SingleClusterRouting: 1217 profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1218 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1219 ClusterId: updateAttrs.ClusterID, 1220 AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites, 1221 }, 1222 } 1223 default: 1224 return errors.New("invalid routing policy") 1225 } 1226 } 1227 patchRequest := &btapb.UpdateAppProfileRequest{ 1228 AppProfile: profile, 1229 UpdateMask: &field_mask.FieldMask{ 1230 Paths: updateAttrs.GetFieldMaskPath(), 1231 }, 1232 IgnoreWarnings: updateAttrs.IgnoreWarnings, 1233 } 1234 updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest) 1235 if err != nil { 1236 return err 1237 } 1238 1239 return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil) 1240 1241} 1242 1243// DeleteAppProfile deletes an app profile from an instance. 1244func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error { 1245 ctx = mergeOutgoingMetadata(ctx, iac.md) 1246 deleteProfileRequest := &btapb.DeleteAppProfileRequest{ 1247 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1248 IgnoreWarnings: true, 1249 } 1250 _, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest) 1251 return err 1252 1253} 1254 1255// UpdateInstanceResults contains information about the 1256// changes made after invoking UpdateInstanceAndSyncClusters. 1257type UpdateInstanceResults struct { 1258 InstanceUpdated bool 1259 CreatedClusters []string 1260 DeletedClusters []string 1261 UpdatedClusters []string 1262} 1263 1264func (r *UpdateInstanceResults) String() string { 1265 return fmt.Sprintf("Instance updated? %v Clusters added:%v Clusters deleted:%v Clusters updated:%v", 1266 r.InstanceUpdated, r.CreatedClusters, r.DeletedClusters, r.UpdatedClusters) 1267} 1268 1269func max(x, y int) int { 1270 if x > y { 1271 return x 1272 } 1273 return y 1274} 1275 1276// UpdateInstanceAndSyncClusters updates an instance and its clusters, and will synchronize the 1277// clusters in the instance with the provided clusters, creating and deleting them as necessary. 1278// The provided InstanceWithClustersConfig is used as follows: 1279// - InstanceID is required 1280// - DisplayName and InstanceType are updated only if they are not empty 1281// - ClusterID is required for any provided cluster 1282// - Any cluster present in conf.Clusters but not part of the instance will be created using CreateCluster 1283// and the given ClusterConfig. 1284// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance 1285// using DeleteCluster. 1286// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the 1287// provided number of nodes if set. 1288// 1289// This method may return an error after partially succeeding, for example if the instance is updated 1290// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to 1291// determine the current state. The return UpdateInstanceResults will describe the work done by the 1292// method, whether partial or complete. 1293func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient, conf *InstanceWithClustersConfig) (*UpdateInstanceResults, error) { 1294 ctx = mergeOutgoingMetadata(ctx, iac.md) 1295 1296 // First fetch the existing clusters so we know what to remove, add or update. 1297 existingClusters, err := iac.Clusters(ctx, conf.InstanceID) 1298 if err != nil { 1299 return nil, err 1300 } 1301 1302 updatedInstance, err := iac.updateInstance(ctx, conf) 1303 if err != nil { 1304 return nil, err 1305 } 1306 1307 results := &UpdateInstanceResults{InstanceUpdated: updatedInstance} 1308 1309 existingClusterNames := make(map[string]bool) 1310 for _, cluster := range existingClusters { 1311 existingClusterNames[cluster.Name] = true 1312 } 1313 1314 // Synchronize clusters that were passed in with the existing clusters in the instance. 1315 // First update any cluster we encounter that already exists in the instance. 1316 // Collect the clusters that we will create and delete so that we can minimize disruption 1317 // of the instance. 1318 clustersToCreate := list.New() 1319 clustersToDelete := list.New() 1320 for _, cluster := range conf.Clusters { 1321 _, clusterExists := existingClusterNames[cluster.ClusterID] 1322 if !clusterExists { 1323 // The cluster doesn't exist yet, so we must create it. 1324 clustersToCreate.PushBack(cluster) 1325 continue 1326 } 1327 delete(existingClusterNames, cluster.ClusterID) 1328 1329 if cluster.NumNodes <= 0 { 1330 // We only synchronize clusters with a valid number of nodes. 1331 continue 1332 } 1333 1334 // We simply want to update this cluster 1335 err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes) 1336 if err != nil { 1337 return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v", 1338 cluster.ClusterID, err, results) 1339 } 1340 results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID) 1341 } 1342 1343 // Any cluster left in existingClusterNames was NOT in the given config and should be deleted. 1344 for clusterToDelete := range existingClusterNames { 1345 clustersToDelete.PushBack(clusterToDelete) 1346 } 1347 1348 // Now that we have the clusters that we need to create and delete, we do so keeping the following 1349 // in mind: 1350 // - Don't delete the last cluster in the instance, as that will result in an error. 1351 // - Attempt to offset each deletion with a creation before another deletion, so that instance 1352 // capacity is never reduced more than necessary. 1353 // Note that there is a limit on number of clusters in an instance which we are not aware of here, 1354 // so delete a cluster before adding one (as long as there are > 1 clusters left) so that we are 1355 // less likely to exceed the maximum number of clusters. 1356 numExistingClusters := len(existingClusters) 1357 nextCreation := clustersToCreate.Front() 1358 nextDeletion := clustersToDelete.Front() 1359 for { 1360 // We are done when both lists are empty. 1361 if nextCreation == nil && nextDeletion == nil { 1362 break 1363 } 1364 1365 // If there is more than one existing cluster, we always want to delete first if possible. 1366 // If there are no more creations left, always go ahead with the deletion. 1367 if (numExistingClusters > 1 && nextDeletion != nil) || nextCreation == nil { 1368 clusterToDelete := nextDeletion.Value.(string) 1369 err = iac.DeleteCluster(ctx, conf.InstanceID, clusterToDelete) 1370 if err != nil { 1371 return results, fmt.Errorf("DeleteCluster %q failed %v; Progress: %v", 1372 clusterToDelete, err, results) 1373 } 1374 results.DeletedClusters = append(results.DeletedClusters, clusterToDelete) 1375 numExistingClusters-- 1376 nextDeletion = nextDeletion.Next() 1377 } 1378 1379 // Now create a new cluster if required. 1380 if nextCreation != nil { 1381 clusterToCreate := nextCreation.Value.(ClusterConfig) 1382 // Assume the cluster config is well formed and rely on the underlying call to error out. 1383 // Make sure to set the InstanceID, though, since we know what it must be. 1384 clusterToCreate.InstanceID = conf.InstanceID 1385 err = iac.CreateCluster(ctx, &clusterToCreate) 1386 if err != nil { 1387 return results, fmt.Errorf("CreateCluster %v failed %v; Progress: %v", 1388 clusterToCreate, err, results) 1389 } 1390 results.CreatedClusters = append(results.CreatedClusters, clusterToCreate.ClusterID) 1391 numExistingClusters++ 1392 nextCreation = nextCreation.Next() 1393 } 1394 } 1395 1396 return results, nil 1397} 1398