1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "container/list" 21 "context" 22 "errors" 23 "fmt" 24 "math" 25 "regexp" 26 "strings" 27 "time" 28 29 btopt "cloud.google.com/go/bigtable/internal/option" 30 "cloud.google.com/go/iam" 31 "cloud.google.com/go/internal/optional" 32 "cloud.google.com/go/longrunning" 33 lroauto "cloud.google.com/go/longrunning/autogen" 34 "github.com/golang/protobuf/ptypes" 35 durpb "github.com/golang/protobuf/ptypes/duration" 36 gax "github.com/googleapis/gax-go/v2" 37 "google.golang.org/api/cloudresourcemanager/v1" 38 "google.golang.org/api/iterator" 39 "google.golang.org/api/option" 40 gtransport "google.golang.org/api/transport/grpc" 41 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 42 "google.golang.org/genproto/googleapis/rpc/status" 43 "google.golang.org/genproto/protobuf/field_mask" 44 "google.golang.org/grpc/metadata" 45) 46 47const adminAddr = "bigtableadmin.googleapis.com:443" 48const mtlsAdminAddr = "bigtableadmin.mtls.googleapis.com:443" 49 50// ErrPartiallyUnavailable is returned when some locations (clusters) are 51// unavailable. Both partial results (retrieved from available locations) 52// and the error are returned when this exception occurred. 53type ErrPartiallyUnavailable struct { 54 Locations []string // unavailable locations 55} 56 57func (e ErrPartiallyUnavailable) Error() string { 58 return fmt.Sprintf("Unavailable locations: %v", e.Locations) 59} 60 61// AdminClient is a client type for performing admin operations within a specific instance. 62type AdminClient struct { 63 connPool gtransport.ConnPool 64 tClient btapb.BigtableTableAdminClient 65 lroClient *lroauto.OperationsClient 66 67 project, instance string 68 69 // Metadata to be sent with each request. 70 md metadata.MD 71} 72 73// NewAdminClient creates a new AdminClient for a given project and instance. 74func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) { 75 o, err := btopt.DefaultClientOptions(adminAddr, mtlsAdminAddr, AdminScope, clientUserAgent) 76 if err != nil { 77 return nil, err 78 } 79 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 80 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 81 // Need to add scopes for long running operations (for create table & snapshots) 82 o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope)) 83 o = append(o, opts...) 84 connPool, err := gtransport.DialPool(ctx, o...) 85 if err != nil { 86 return nil, fmt.Errorf("dialing: %v", err) 87 } 88 89 lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool)) 90 if err != nil { 91 // This error "should not happen", since we are just reusing old connection 92 // and never actually need to dial. 93 // If this does happen, we could leak conn. However, we cannot close conn: 94 // If the user invoked the function with option.WithGRPCConn, 95 // we would close a connection that's still in use. 96 // TODO(pongad): investigate error conditions. 97 return nil, err 98 } 99 100 return &AdminClient{ 101 connPool: connPool, 102 tClient: btapb.NewBigtableTableAdminClient(connPool), 103 lroClient: lroClient, 104 project: project, 105 instance: instance, 106 md: metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)), 107 }, nil 108} 109 110// Close closes the AdminClient. 111func (ac *AdminClient) Close() error { 112 return ac.connPool.Close() 113} 114 115func (ac *AdminClient) instancePrefix() string { 116 return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance) 117} 118 119func (ac *AdminClient) backupPath(cluster, instance, backup string) string { 120 return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup) 121} 122 123// EncryptionInfo represents the encryption info of a table. 124type EncryptionInfo struct { 125 Status *Status 126 Type EncryptionType 127 KMSKeyVersion string 128} 129 130func newEncryptionInfo(pbInfo *btapb.EncryptionInfo) *EncryptionInfo { 131 return &EncryptionInfo{ 132 Status: pbInfo.EncryptionStatus, 133 Type: EncryptionType(pbInfo.EncryptionType.Number()), 134 KMSKeyVersion: pbInfo.KmsKeyVersion, 135 } 136} 137 138// Status references google.golang.org/grpc/status. 139// It represents an RPC status code, message, and details of EncryptionInfo. 140// https://pkg.go.dev/google.golang.org/grpc/internal/status 141type Status = status.Status 142 143// EncryptionType is the type of encryption for an instance. 144type EncryptionType int32 145 146const ( 147 // EncryptionTypeUnspecified is the type was not specified, though data at rest remains encrypted. 148 EncryptionTypeUnspecified EncryptionType = iota 149 // GoogleDefaultEncryption represents that data backing this resource is 150 // encrypted at rest with a key that is fully managed by Google. No key 151 // version or status will be populated. This is the default state. 152 GoogleDefaultEncryption 153 // CustomerManagedEncryption represents that data backing this resource is 154 // encrypted at rest with a key that is managed by the customer. 155 // The in-use version of the key and its status are populated for 156 // CMEK-protected tables. 157 // CMEK-protected backups are pinned to the key version that was in use at 158 // the time the backup was taken. This key version is populated but its 159 // status is not tracked and is reported as `UNKNOWN`. 160 CustomerManagedEncryption 161) 162 163// EncryptionInfoByCluster is a map of cluster name to EncryptionInfo 164type EncryptionInfoByCluster map[string][]*EncryptionInfo 165 166// EncryptionInfo gets the current encryption info for the table across all of the clusters. 167// The returned map will be keyed by cluster id and contain a status for all of the keys in use. 168func (ac *AdminClient) EncryptionInfo(ctx context.Context, table string) (EncryptionInfoByCluster, error) { 169 ctx = mergeOutgoingMetadata(ctx, ac.md) 170 171 res, err := ac.getTable(ctx, table, btapb.Table_ENCRYPTION_VIEW) 172 if err != nil { 173 return nil, err 174 } 175 encryptionInfo := EncryptionInfoByCluster{} 176 for key, cs := range res.ClusterStates { 177 for _, pbInfo := range cs.EncryptionInfo { 178 info := EncryptionInfo{} 179 info.Status = pbInfo.EncryptionStatus 180 info.Type = EncryptionType(pbInfo.EncryptionType.Number()) 181 info.KMSKeyVersion = pbInfo.KmsKeyVersion 182 encryptionInfo[key] = append(encryptionInfo[key], &info) 183 } 184 } 185 186 return encryptionInfo, nil 187} 188 189// Tables returns a list of the tables in the instance. 190func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { 191 ctx = mergeOutgoingMetadata(ctx, ac.md) 192 prefix := ac.instancePrefix() 193 req := &btapb.ListTablesRequest{ 194 Parent: prefix, 195 } 196 197 var res *btapb.ListTablesResponse 198 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 199 var err error 200 res, err = ac.tClient.ListTables(ctx, req) 201 return err 202 }, retryOptions...) 203 if err != nil { 204 return nil, err 205 } 206 207 names := make([]string, 0, len(res.Tables)) 208 for _, tbl := range res.Tables { 209 names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/")) 210 } 211 return names, nil 212} 213 214// TableConf contains all of the information necessary to create a table with column families. 215type TableConf struct { 216 TableID string 217 SplitKeys []string 218 // Families is a map from family name to GCPolicy 219 Families map[string]GCPolicy 220} 221 222// CreateTable creates a new table in the instance. 223// This method may return before the table's creation is complete. 224func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { 225 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table}) 226} 227 228// CreatePresplitTable creates a new table in the instance. 229// The list of row keys will be used to initially split the table into multiple tablets. 230// Given two split keys, "s1" and "s2", three tablets will be created, 231// spanning the key ranges: [, s1), [s1, s2), [s2, ). 232// This method may return before the table's creation is complete. 233func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error { 234 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys}) 235} 236 237// CreateTableFromConf creates a new table in the instance from the given configuration. 238func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error { 239 ctx = mergeOutgoingMetadata(ctx, ac.md) 240 var reqSplits []*btapb.CreateTableRequest_Split 241 for _, split := range conf.SplitKeys { 242 reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)}) 243 } 244 var tbl btapb.Table 245 if conf.Families != nil { 246 tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) 247 for fam, policy := range conf.Families { 248 tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()} 249 } 250 } 251 prefix := ac.instancePrefix() 252 req := &btapb.CreateTableRequest{ 253 Parent: prefix, 254 TableId: conf.TableID, 255 Table: &tbl, 256 InitialSplits: reqSplits, 257 } 258 _, err := ac.tClient.CreateTable(ctx, req) 259 return err 260} 261 262// CreateColumnFamily creates a new column family in a table. 263func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error { 264 // TODO(dsymonds): Permit specifying gcexpr and any other family settings. 265 ctx = mergeOutgoingMetadata(ctx, ac.md) 266 prefix := ac.instancePrefix() 267 req := &btapb.ModifyColumnFamiliesRequest{ 268 Name: prefix + "/tables/" + table, 269 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 270 Id: family, 271 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, 272 }}, 273 } 274 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 275 return err 276} 277 278// DeleteTable deletes a table and all of its data. 279func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error { 280 ctx = mergeOutgoingMetadata(ctx, ac.md) 281 prefix := ac.instancePrefix() 282 req := &btapb.DeleteTableRequest{ 283 Name: prefix + "/tables/" + table, 284 } 285 _, err := ac.tClient.DeleteTable(ctx, req) 286 return err 287} 288 289// DeleteColumnFamily deletes a column family in a table and all of its data. 290func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error { 291 ctx = mergeOutgoingMetadata(ctx, ac.md) 292 prefix := ac.instancePrefix() 293 req := &btapb.ModifyColumnFamiliesRequest{ 294 Name: prefix + "/tables/" + table, 295 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 296 Id: family, 297 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true}, 298 }}, 299 } 300 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 301 return err 302} 303 304// TableInfo represents information about a table. 305type TableInfo struct { 306 // DEPRECATED - This field is deprecated. Please use FamilyInfos instead. 307 Families []string 308 FamilyInfos []FamilyInfo 309} 310 311// FamilyInfo represents information about a column family. 312type FamilyInfo struct { 313 Name string 314 GCPolicy string 315} 316 317func (ac *AdminClient) getTable(ctx context.Context, table string, view btapb.Table_View) (*btapb.Table, error) { 318 ctx = mergeOutgoingMetadata(ctx, ac.md) 319 prefix := ac.instancePrefix() 320 req := &btapb.GetTableRequest{ 321 Name: prefix + "/tables/" + table, 322 View: view, 323 } 324 325 var res *btapb.Table 326 327 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 328 var err error 329 res, err = ac.tClient.GetTable(ctx, req) 330 return err 331 }, retryOptions...) 332 if err != nil { 333 return nil, err 334 } 335 return res, nil 336} 337 338// TableInfo retrieves information about a table. 339func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) { 340 ctx = mergeOutgoingMetadata(ctx, ac.md) 341 342 res, err := ac.getTable(ctx, table, btapb.Table_SCHEMA_VIEW) 343 if err != nil { 344 return nil, err 345 } 346 347 ti := &TableInfo{} 348 for name, fam := range res.ColumnFamilies { 349 ti.Families = append(ti.Families, name) 350 ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)}) 351 } 352 return ti, nil 353} 354 355// SetGCPolicy specifies which cells in a column family should be garbage collected. 356// GC executes opportunistically in the background; table reads may return data 357// matching the GC policy. 358func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { 359 ctx = mergeOutgoingMetadata(ctx, ac.md) 360 prefix := ac.instancePrefix() 361 req := &btapb.ModifyColumnFamiliesRequest{ 362 Name: prefix + "/tables/" + table, 363 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 364 Id: family, 365 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}}, 366 }}, 367 } 368 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 369 return err 370} 371 372// DropRowRange permanently deletes a row range from the specified table. 373func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error { 374 ctx = mergeOutgoingMetadata(ctx, ac.md) 375 prefix := ac.instancePrefix() 376 req := &btapb.DropRowRangeRequest{ 377 Name: prefix + "/tables/" + table, 378 Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)}, 379 } 380 _, err := ac.tClient.DropRowRange(ctx, req) 381 return err 382} 383 384// DropAllRows permanently deletes all rows from the specified table. 385func (ac *AdminClient) DropAllRows(ctx context.Context, table string) error { 386 ctx = mergeOutgoingMetadata(ctx, ac.md) 387 prefix := ac.instancePrefix() 388 req := &btapb.DropRowRangeRequest{ 389 Name: prefix + "/tables/" + table, 390 Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true}, 391 } 392 _, err := ac.tClient.DropRowRange(ctx, req) 393 return err 394} 395 396// CreateTableFromSnapshot creates a table from snapshot. 397// The table will be created in the same cluster as the snapshot. 398// 399// This is a private alpha release of Cloud Bigtable snapshots. This feature 400// is not currently available to most Cloud Bigtable customers. This feature 401// might be changed in backward-incompatible ways and is not recommended for 402// production use. It is not subject to any SLA or deprecation policy. 403func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error { 404 ctx = mergeOutgoingMetadata(ctx, ac.md) 405 prefix := ac.instancePrefix() 406 snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot 407 408 req := &btapb.CreateTableFromSnapshotRequest{ 409 Parent: prefix, 410 TableId: table, 411 SourceSnapshot: snapshotPath, 412 } 413 op, err := ac.tClient.CreateTableFromSnapshot(ctx, req) 414 if err != nil { 415 return err 416 } 417 resp := btapb.Table{} 418 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 419} 420 421// DefaultSnapshotDuration is the default TTL for a snapshot. 422const DefaultSnapshotDuration time.Duration = 0 423 424// SnapshotTable creates a new snapshot in the specified cluster from the 425// specified source table. Setting the TTL to `DefaultSnapshotDuration` will 426// use the server side default for the duration. 427// 428// This is a private alpha release of Cloud Bigtable snapshots. This feature 429// is not currently available to most Cloud Bigtable customers. This feature 430// might be changed in backward-incompatible ways and is not recommended for 431// production use. It is not subject to any SLA or deprecation policy. 432func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error { 433 ctx = mergeOutgoingMetadata(ctx, ac.md) 434 prefix := ac.instancePrefix() 435 436 var ttlProto *durpb.Duration 437 438 if ttl > 0 { 439 ttlProto = ptypes.DurationProto(ttl) 440 } 441 442 req := &btapb.SnapshotTableRequest{ 443 Name: prefix + "/tables/" + table, 444 Cluster: prefix + "/clusters/" + cluster, 445 SnapshotId: snapshot, 446 Ttl: ttlProto, 447 } 448 449 op, err := ac.tClient.SnapshotTable(ctx, req) 450 if err != nil { 451 return err 452 } 453 resp := btapb.Snapshot{} 454 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 455} 456 457// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster. 458// To list snapshots across all of the clusters in the instance specify "-" as the cluster. 459// 460// This is a private alpha release of Cloud Bigtable snapshots. This feature is not 461// currently available to most Cloud Bigtable customers. This feature might be 462// changed in backward-incompatible ways and is not recommended for production use. 463// It is not subject to any SLA or deprecation policy. 464func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator { 465 ctx = mergeOutgoingMetadata(ctx, ac.md) 466 prefix := ac.instancePrefix() 467 clusterPath := prefix + "/clusters/" + cluster 468 469 it := &SnapshotIterator{} 470 req := &btapb.ListSnapshotsRequest{ 471 Parent: clusterPath, 472 } 473 474 fetch := func(pageSize int, pageToken string) (string, error) { 475 req.PageToken = pageToken 476 if pageSize > math.MaxInt32 { 477 req.PageSize = math.MaxInt32 478 } else { 479 req.PageSize = int32(pageSize) 480 } 481 482 var resp *btapb.ListSnapshotsResponse 483 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 484 var err error 485 resp, err = ac.tClient.ListSnapshots(ctx, req) 486 return err 487 }, retryOptions...) 488 if err != nil { 489 return "", err 490 } 491 for _, s := range resp.Snapshots { 492 snapshotInfo, err := newSnapshotInfo(s) 493 if err != nil { 494 return "", fmt.Errorf("failed to parse snapshot proto %v", err) 495 } 496 it.items = append(it.items, snapshotInfo) 497 } 498 return resp.NextPageToken, nil 499 } 500 bufLen := func() int { return len(it.items) } 501 takeBuf := func() interface{} { b := it.items; it.items = nil; return b } 502 503 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 504 505 return it 506} 507 508func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) { 509 nameParts := strings.Split(snapshot.Name, "/") 510 name := nameParts[len(nameParts)-1] 511 tablePathParts := strings.Split(snapshot.SourceTable.Name, "/") 512 tableID := tablePathParts[len(tablePathParts)-1] 513 514 createTime, err := ptypes.Timestamp(snapshot.CreateTime) 515 if err != nil { 516 return nil, fmt.Errorf("invalid createTime: %v", err) 517 } 518 519 deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime) 520 if err != nil { 521 return nil, fmt.Errorf("invalid deleteTime: %v", err) 522 } 523 524 return &SnapshotInfo{ 525 Name: name, 526 SourceTable: tableID, 527 DataSize: snapshot.DataSizeBytes, 528 CreateTime: createTime, 529 DeleteTime: deleteTime, 530 }, nil 531} 532 533// SnapshotIterator is an EntryIterator that iterates over log entries. 534// 535// This is a private alpha release of Cloud Bigtable snapshots. This feature 536// is not currently available to most Cloud Bigtable customers. This feature 537// might be changed in backward-incompatible ways and is not recommended for 538// production use. It is not subject to any SLA or deprecation policy. 539type SnapshotIterator struct { 540 items []*SnapshotInfo 541 pageInfo *iterator.PageInfo 542 nextFunc func() error 543} 544 545// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 546func (it *SnapshotIterator) PageInfo() *iterator.PageInfo { 547 return it.pageInfo 548} 549 550// Next returns the next result. Its second return value is iterator.Done 551// (https://godoc.org/google.golang.org/api/iterator) if there are no more 552// results. Once Next returns Done, all subsequent calls will return Done. 553func (it *SnapshotIterator) Next() (*SnapshotInfo, error) { 554 if err := it.nextFunc(); err != nil { 555 return nil, err 556 } 557 item := it.items[0] 558 it.items = it.items[1:] 559 return item, nil 560} 561 562// SnapshotInfo contains snapshot metadata. 563type SnapshotInfo struct { 564 Name string 565 SourceTable string 566 DataSize int64 567 CreateTime time.Time 568 DeleteTime time.Time 569} 570 571// SnapshotInfo gets snapshot metadata. 572// 573// This is a private alpha release of Cloud Bigtable snapshots. This feature 574// is not currently available to most Cloud Bigtable customers. This feature 575// might be changed in backward-incompatible ways and is not recommended for 576// production use. It is not subject to any SLA or deprecation policy. 577func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) { 578 ctx = mergeOutgoingMetadata(ctx, ac.md) 579 prefix := ac.instancePrefix() 580 clusterPath := prefix + "/clusters/" + cluster 581 snapshotPath := clusterPath + "/snapshots/" + snapshot 582 583 req := &btapb.GetSnapshotRequest{ 584 Name: snapshotPath, 585 } 586 587 var resp *btapb.Snapshot 588 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 589 var err error 590 resp, err = ac.tClient.GetSnapshot(ctx, req) 591 return err 592 }, retryOptions...) 593 if err != nil { 594 return nil, err 595 } 596 597 return newSnapshotInfo(resp) 598} 599 600// DeleteSnapshot deletes a snapshot in a cluster. 601// 602// This is a private alpha release of Cloud Bigtable snapshots. This feature 603// is not currently available to most Cloud Bigtable customers. This feature 604// might be changed in backward-incompatible ways and is not recommended for 605// production use. It is not subject to any SLA or deprecation policy. 606func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error { 607 ctx = mergeOutgoingMetadata(ctx, ac.md) 608 prefix := ac.instancePrefix() 609 clusterPath := prefix + "/clusters/" + cluster 610 snapshotPath := clusterPath + "/snapshots/" + snapshot 611 612 req := &btapb.DeleteSnapshotRequest{ 613 Name: snapshotPath, 614 } 615 _, err := ac.tClient.DeleteSnapshot(ctx, req) 616 return err 617} 618 619// getConsistencyToken gets the consistency token for a table. 620func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) { 621 req := &btapb.GenerateConsistencyTokenRequest{ 622 Name: tableName, 623 } 624 resp, err := ac.tClient.GenerateConsistencyToken(ctx, req) 625 if err != nil { 626 return "", err 627 } 628 return resp.GetConsistencyToken(), nil 629} 630 631// isConsistent checks if a token is consistent for a table. 632func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) { 633 req := &btapb.CheckConsistencyRequest{ 634 Name: tableName, 635 ConsistencyToken: token, 636 } 637 var resp *btapb.CheckConsistencyResponse 638 639 // Retry calls on retryable errors to avoid losing the token gathered before. 640 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 641 var err error 642 resp, err = ac.tClient.CheckConsistency(ctx, req) 643 return err 644 }, retryOptions...) 645 if err != nil { 646 return false, err 647 } 648 return resp.GetConsistent(), nil 649} 650 651// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication. 652func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error { 653 ctx = mergeOutgoingMetadata(ctx, ac.md) 654 // Get the token. 655 prefix := ac.instancePrefix() 656 tableName := prefix + "/tables/" + table 657 token, err := ac.getConsistencyToken(ctx, tableName) 658 if err != nil { 659 return err 660 } 661 662 // Periodically check if the token is consistent. 663 timer := time.NewTicker(time.Second * 10) 664 defer timer.Stop() 665 for { 666 consistent, err := ac.isConsistent(ctx, tableName, token) 667 if err != nil { 668 return err 669 } 670 if consistent { 671 return nil 672 } 673 // Sleep for a bit or until the ctx is cancelled. 674 select { 675 case <-ctx.Done(): 676 return ctx.Err() 677 case <-timer.C: 678 } 679 } 680} 681 682// TableIAM creates an IAM Handle specific to a given Instance and Table within the configured project. 683func (ac *AdminClient) TableIAM(tableID string) *iam.Handle { 684 return iam.InternalNewHandleGRPCClient(ac.tClient, 685 "projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID) 686} 687 688// BackupIAM creates an IAM Handle specific to a given Cluster and Backup. 689func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle { 690 return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup)) 691} 692 693const instanceAdminAddr = "bigtableadmin.googleapis.com:443" 694const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443" 695 696// InstanceAdminClient is a client type for performing admin operations on instances. 697// These operations can be substantially more dangerous than those provided by AdminClient. 698type InstanceAdminClient struct { 699 connPool gtransport.ConnPool 700 iClient btapb.BigtableInstanceAdminClient 701 lroClient *lroauto.OperationsClient 702 703 project string 704 705 // Metadata to be sent with each request. 706 md metadata.MD 707} 708 709// NewInstanceAdminClient creates a new InstanceAdminClient for a given project. 710func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) { 711 o, err := btopt.DefaultClientOptions(instanceAdminAddr, mtlsInstanceAdminAddr, InstanceAdminScope, clientUserAgent) 712 if err != nil { 713 return nil, err 714 } 715 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 716 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 717 o = append(o, opts...) 718 connPool, err := gtransport.DialPool(ctx, o...) 719 if err != nil { 720 return nil, fmt.Errorf("dialing: %v", err) 721 } 722 723 lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool)) 724 if err != nil { 725 // This error "should not happen", since we are just reusing old connection 726 // and never actually need to dial. 727 // If this does happen, we could leak conn. However, we cannot close conn: 728 // If the user invoked the function with option.WithGRPCConn, 729 // we would close a connection that's still in use. 730 // TODO(pongad): investigate error conditions. 731 return nil, err 732 } 733 734 return &InstanceAdminClient{ 735 connPool: connPool, 736 iClient: btapb.NewBigtableInstanceAdminClient(connPool), 737 lroClient: lroClient, 738 739 project: project, 740 md: metadata.Pairs(resourcePrefixHeader, "projects/"+project), 741 }, nil 742} 743 744// Close closes the InstanceAdminClient. 745func (iac *InstanceAdminClient) Close() error { 746 return iac.connPool.Close() 747} 748 749// StorageType is the type of storage used for all tables in an instance 750type StorageType int 751 752const ( 753 SSD StorageType = iota 754 HDD 755) 756 757func (st StorageType) proto() btapb.StorageType { 758 if st == HDD { 759 return btapb.StorageType_HDD 760 } 761 return btapb.StorageType_SSD 762} 763 764func storageTypeFromProto(st btapb.StorageType) StorageType { 765 if st == btapb.StorageType_HDD { 766 return HDD 767 } 768 769 return SSD 770} 771 772// InstanceState is the state of the instance. This is output-only. 773type InstanceState int32 774 775const ( 776 // NotKnown represents the state of an instance that could not be determined. 777 NotKnown InstanceState = InstanceState(btapb.Instance_STATE_NOT_KNOWN) 778 // Ready represents the state of an instance that has been successfully created. 779 Ready = InstanceState(btapb.Instance_READY) 780 // Creating represents the state of an instance that is currently being created. 781 Creating = InstanceState(btapb.Instance_CREATING) 782) 783 784// InstanceType is the type of the instance. 785type InstanceType int32 786 787const ( 788 // UNSPECIFIED instance types default to PRODUCTION 789 UNSPECIFIED InstanceType = InstanceType(btapb.Instance_TYPE_UNSPECIFIED) 790 PRODUCTION = InstanceType(btapb.Instance_PRODUCTION) 791 DEVELOPMENT = InstanceType(btapb.Instance_DEVELOPMENT) 792) 793 794// InstanceInfo represents information about an instance 795type InstanceInfo struct { 796 Name string // name of the instance 797 DisplayName string // display name for UIs 798 InstanceState InstanceState 799 InstanceType InstanceType 800 Labels map[string]string 801} 802 803// InstanceConf contains the information necessary to create an Instance 804type InstanceConf struct { 805 InstanceId, DisplayName, ClusterId, Zone string 806 // NumNodes must not be specified for DEVELOPMENT instance types 807 NumNodes int32 808 StorageType StorageType 809 InstanceType InstanceType 810 Labels map[string]string 811} 812 813// InstanceWithClustersConfig contains the information necessary to create an Instance 814type InstanceWithClustersConfig struct { 815 InstanceID, DisplayName string 816 Clusters []ClusterConfig 817 InstanceType InstanceType 818 Labels map[string]string 819} 820 821var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`) 822 823// CreateInstance creates a new instance in the project. 824// This method will return when the instance has been created or when an error occurs. 825func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error { 826 ctx = mergeOutgoingMetadata(ctx, iac.md) 827 newConfig := InstanceWithClustersConfig{ 828 InstanceID: conf.InstanceId, 829 DisplayName: conf.DisplayName, 830 InstanceType: conf.InstanceType, 831 Labels: conf.Labels, 832 Clusters: []ClusterConfig{ 833 { 834 InstanceID: conf.InstanceId, 835 ClusterID: conf.ClusterId, 836 Zone: conf.Zone, 837 NumNodes: conf.NumNodes, 838 StorageType: conf.StorageType, 839 }, 840 }, 841 } 842 return iac.CreateInstanceWithClusters(ctx, &newConfig) 843} 844 845// CreateInstanceWithClusters creates a new instance with configured clusters in the project. 846// This method will return when the instance has been created or when an error occurs. 847func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 848 ctx = mergeOutgoingMetadata(ctx, iac.md) 849 clusters := make(map[string]*btapb.Cluster) 850 for _, cluster := range conf.Clusters { 851 clusters[cluster.ClusterID] = cluster.proto(iac.project) 852 } 853 854 req := &btapb.CreateInstanceRequest{ 855 Parent: "projects/" + iac.project, 856 InstanceId: conf.InstanceID, 857 Instance: &btapb.Instance{ 858 DisplayName: conf.DisplayName, 859 Type: btapb.Instance_Type(conf.InstanceType), 860 Labels: conf.Labels, 861 }, 862 Clusters: clusters, 863 } 864 865 lro, err := iac.iClient.CreateInstance(ctx, req) 866 if err != nil { 867 return err 868 } 869 resp := btapb.Instance{} 870 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 871} 872 873// updateInstance updates a single instance based on config fields that operate 874// at an instance level: DisplayName and InstanceType. 875func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *InstanceWithClustersConfig) (updated bool, err error) { 876 if conf.InstanceID == "" { 877 return false, errors.New("InstanceID is required") 878 } 879 880 // Update the instance, if necessary 881 mask := &field_mask.FieldMask{} 882 ireq := &btapb.PartialUpdateInstanceRequest{ 883 Instance: &btapb.Instance{ 884 Name: "projects/" + iac.project + "/instances/" + conf.InstanceID, 885 }, 886 UpdateMask: mask, 887 } 888 if conf.DisplayName != "" { 889 ireq.Instance.DisplayName = conf.DisplayName 890 mask.Paths = append(mask.Paths, "display_name") 891 } 892 if btapb.Instance_Type(conf.InstanceType) != btapb.Instance_TYPE_UNSPECIFIED { 893 ireq.Instance.Type = btapb.Instance_Type(conf.InstanceType) 894 mask.Paths = append(mask.Paths, "type") 895 } 896 if conf.Labels != nil { 897 ireq.Instance.Labels = conf.Labels 898 mask.Paths = append(mask.Paths, "labels") 899 } 900 901 if len(mask.Paths) == 0 { 902 return false, nil 903 } 904 905 lro, err := iac.iClient.PartialUpdateInstance(ctx, ireq) 906 if err != nil { 907 return false, err 908 } 909 err = longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 910 if err != nil { 911 return false, err 912 } 913 914 return true, nil 915} 916 917// UpdateInstanceWithClusters updates an instance and its clusters. Updateable 918// fields are instance display name, instance type and cluster size. 919// The provided InstanceWithClustersConfig is used as follows: 920// - InstanceID is required 921// - DisplayName and InstanceType are updated only if they are not empty 922// - ClusterID is required for any provided cluster 923// - All other cluster fields are ignored except for NumNodes, which if set will be updated 924// 925// This method may return an error after partially succeeding, for example if the instance is updated 926// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to 927// determine the current state. 928func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 929 ctx = mergeOutgoingMetadata(ctx, iac.md) 930 931 for _, cluster := range conf.Clusters { 932 if cluster.ClusterID == "" { 933 return errors.New("ClusterID is required for every cluster") 934 } 935 } 936 937 updatedInstance, err := iac.updateInstance(ctx, conf) 938 if err != nil { 939 return err 940 } 941 942 // Update any clusters 943 for _, cluster := range conf.Clusters { 944 err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes) 945 if err != nil { 946 if updatedInstance { 947 // We updated the instance, so note that in the error message. 948 return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded", 949 cluster.ClusterID, err) 950 } 951 return err 952 } 953 } 954 955 return nil 956} 957 958// DeleteInstance deletes an instance from the project. 959func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error { 960 ctx = mergeOutgoingMetadata(ctx, iac.md) 961 req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID} 962 _, err := iac.iClient.DeleteInstance(ctx, req) 963 return err 964} 965 966// Instances returns a list of instances in the project. If any location 967// (cluster) is unavailable due to some transient conditions, Instances 968// returns partial results and ErrPartiallyUnavailable error with 969// unavailable locations list. 970func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) { 971 ctx = mergeOutgoingMetadata(ctx, iac.md) 972 req := &btapb.ListInstancesRequest{ 973 Parent: "projects/" + iac.project, 974 } 975 var res *btapb.ListInstancesResponse 976 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 977 var err error 978 res, err = iac.iClient.ListInstances(ctx, req) 979 return err 980 }, retryOptions...) 981 if err != nil { 982 return nil, err 983 } 984 985 var is []*InstanceInfo 986 for _, i := range res.Instances { 987 m := instanceNameRegexp.FindStringSubmatch(i.Name) 988 if m == nil { 989 return nil, fmt.Errorf("malformed instance name %q", i.Name) 990 } 991 is = append(is, &InstanceInfo{ 992 Name: m[2], 993 DisplayName: i.DisplayName, 994 InstanceState: InstanceState(i.State), 995 InstanceType: InstanceType(i.Type), 996 Labels: i.Labels, 997 }) 998 } 999 if len(res.FailedLocations) > 0 { 1000 // Return partial results and an error in 1001 // case of some locations are unavailable. 1002 return is, ErrPartiallyUnavailable{res.FailedLocations} 1003 } 1004 return is, nil 1005} 1006 1007// InstanceInfo returns information about an instance. 1008func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) { 1009 ctx = mergeOutgoingMetadata(ctx, iac.md) 1010 req := &btapb.GetInstanceRequest{ 1011 Name: "projects/" + iac.project + "/instances/" + instanceID, 1012 } 1013 var res *btapb.Instance 1014 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1015 var err error 1016 res, err = iac.iClient.GetInstance(ctx, req) 1017 return err 1018 }, retryOptions...) 1019 if err != nil { 1020 return nil, err 1021 } 1022 1023 m := instanceNameRegexp.FindStringSubmatch(res.Name) 1024 if m == nil { 1025 return nil, fmt.Errorf("malformed instance name %q", res.Name) 1026 } 1027 return &InstanceInfo{ 1028 Name: m[2], 1029 DisplayName: res.DisplayName, 1030 InstanceState: InstanceState(res.State), 1031 InstanceType: InstanceType(res.Type), 1032 Labels: res.Labels, 1033 }, nil 1034} 1035 1036// ClusterConfig contains the information necessary to create a cluster 1037type ClusterConfig struct { 1038 // InstanceID specifies the unique name of the instance. Required. 1039 InstanceID string 1040 1041 // ClusterID specifies the unique name of the cluster. Required. 1042 ClusterID string 1043 1044 // Zone specifies the location where this cluster's nodes and storage reside. 1045 // For best performance, clients should be located as close as possible to this 1046 // cluster. Required. 1047 Zone string 1048 1049 // NumNodes specifies the number of nodes allocated to this cluster. More 1050 // nodes enable higher throughput and more consistent performance. Required. 1051 NumNodes int32 1052 1053 // StorageType specifies the type of storage used by this cluster to serve 1054 // its parent instance's tables, unless explicitly overridden. Required. 1055 StorageType StorageType 1056 1057 // KMSKeyName is the name of the KMS customer managed encryption key (CMEK) 1058 // to use for at-rest encryption of data in this cluster. If omitted, 1059 // Google's default encryption will be used. If specified, the requirements 1060 // for this key are: 1061 // 1) The Cloud Bigtable service account associated with the 1062 // project that contains the cluster must be granted the 1063 // ``cloudkms.cryptoKeyEncrypterDecrypter`` role on the 1064 // CMEK. 1065 // 2) Only regional keys can be used and the region of the 1066 // CMEK key must match the region of the cluster. 1067 // 3) All clusters within an instance must use the same CMEK 1068 // key. 1069 // Optional. Immutable. 1070 KMSKeyName string 1071} 1072 1073func (cc *ClusterConfig) proto(project string) *btapb.Cluster { 1074 ec := btapb.Cluster_EncryptionConfig{} 1075 ec.KmsKeyName = cc.KMSKeyName 1076 return &btapb.Cluster{ 1077 ServeNodes: cc.NumNodes, 1078 DefaultStorageType: cc.StorageType.proto(), 1079 Location: "projects/" + project + "/locations/" + cc.Zone, 1080 EncryptionConfig: &ec, 1081 } 1082} 1083 1084// ClusterInfo represents information about a cluster. 1085type ClusterInfo struct { 1086 // Name is the name of the cluster. 1087 Name string 1088 1089 // Zone is the GCP zone of the cluster (e.g. "us-central1-a"). 1090 Zone string 1091 1092 // ServeNodes is the number of allocated serve nodes. 1093 ServeNodes int 1094 1095 // State is the state of the cluster. 1096 State string 1097 1098 // StorageType is the storage type of the cluster. 1099 StorageType StorageType 1100 1101 // KMSKeyName is the customer managed encryption key for the cluster. 1102 KMSKeyName string 1103} 1104 1105// CreateCluster creates a new cluster in an instance. 1106// This method will return when the cluster has been created or when an error occurs. 1107func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error { 1108 ctx = mergeOutgoingMetadata(ctx, iac.md) 1109 1110 req := &btapb.CreateClusterRequest{ 1111 Parent: "projects/" + iac.project + "/instances/" + conf.InstanceID, 1112 ClusterId: conf.ClusterID, 1113 Cluster: conf.proto(iac.project), 1114 } 1115 1116 lro, err := iac.iClient.CreateCluster(ctx, req) 1117 if err != nil { 1118 return err 1119 } 1120 resp := btapb.Cluster{} 1121 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 1122} 1123 1124// DeleteCluster deletes a cluster from an instance. 1125func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error { 1126 ctx = mergeOutgoingMetadata(ctx, iac.md) 1127 req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 1128 _, err := iac.iClient.DeleteCluster(ctx, req) 1129 return err 1130} 1131 1132// UpdateCluster updates attributes of a cluster 1133func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error { 1134 ctx = mergeOutgoingMetadata(ctx, iac.md) 1135 cluster := &btapb.Cluster{ 1136 Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID, 1137 ServeNodes: serveNodes} 1138 lro, err := iac.iClient.UpdateCluster(ctx, cluster) 1139 if err != nil { 1140 return err 1141 } 1142 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 1143} 1144 1145// Clusters lists the clusters in an instance. If any location 1146// (cluster) is unavailable due to some transient conditions, Clusters 1147// returns partial results and ErrPartiallyUnavailable error with 1148// unavailable locations list. 1149func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) { 1150 ctx = mergeOutgoingMetadata(ctx, iac.md) 1151 req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID} 1152 var res *btapb.ListClustersResponse 1153 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1154 var err error 1155 res, err = iac.iClient.ListClusters(ctx, req) 1156 return err 1157 }, retryOptions...) 1158 if err != nil { 1159 return nil, err 1160 } 1161 1162 var cis []*ClusterInfo 1163 for _, c := range res.Clusters { 1164 nameParts := strings.Split(c.Name, "/") 1165 locParts := strings.Split(c.Location, "/") 1166 kmsKeyName := "" 1167 if c.EncryptionConfig != nil { 1168 kmsKeyName = c.EncryptionConfig.KmsKeyName 1169 } 1170 cis = append(cis, &ClusterInfo{ 1171 Name: nameParts[len(nameParts)-1], 1172 Zone: locParts[len(locParts)-1], 1173 ServeNodes: int(c.ServeNodes), 1174 State: c.State.String(), 1175 StorageType: storageTypeFromProto(c.DefaultStorageType), 1176 KMSKeyName: kmsKeyName, 1177 }) 1178 } 1179 if len(res.FailedLocations) > 0 { 1180 // Return partial results and an error in 1181 // case of some locations are unavailable. 1182 return cis, ErrPartiallyUnavailable{res.FailedLocations} 1183 } 1184 return cis, nil 1185} 1186 1187// GetCluster fetches a cluster in an instance 1188func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) { 1189 ctx = mergeOutgoingMetadata(ctx, iac.md) 1190 req := &btapb.GetClusterRequest{ 1191 Name: fmt.Sprintf("projects/%s/instances/%s/clusters/%s", iac.project, instanceID, clusterID), 1192 } 1193 var c *btapb.Cluster 1194 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1195 var err error 1196 c, err = iac.iClient.GetCluster(ctx, req) 1197 return err 1198 }, retryOptions...) 1199 if err != nil { 1200 return nil, err 1201 } 1202 1203 kmsKeyName := "" 1204 if c.EncryptionConfig != nil { 1205 kmsKeyName = c.EncryptionConfig.KmsKeyName 1206 } 1207 nameParts := strings.Split(c.Name, "/") 1208 locParts := strings.Split(c.Location, "/") 1209 cis := &ClusterInfo{ 1210 Name: nameParts[len(nameParts)-1], 1211 Zone: locParts[len(locParts)-1], 1212 ServeNodes: int(c.ServeNodes), 1213 State: c.State.String(), 1214 StorageType: storageTypeFromProto(c.DefaultStorageType), 1215 KMSKeyName: kmsKeyName, 1216 } 1217 return cis, nil 1218} 1219 1220// InstanceIAM returns the instance's IAM handle. 1221func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle { 1222 return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID) 1223} 1224 1225// Routing policies. 1226const ( 1227 // MultiClusterRouting is a policy that allows read/write requests to be 1228 // routed to any cluster in the instance. Requests will will fail over to 1229 // another cluster in the event of transient errors or delays. Choosing 1230 // this option sacrifices read-your-writes consistency to improve 1231 // availability. 1232 MultiClusterRouting = "multi_cluster_routing_use_any" 1233 // SingleClusterRouting is a policy that unconditionally routes all 1234 // read/write requests to a specific cluster. This option preserves 1235 // read-your-writes consistency, but does not improve availability. 1236 SingleClusterRouting = "single_cluster_routing" 1237) 1238 1239// ProfileConf contains the information necessary to create an profile 1240type ProfileConf struct { 1241 Name string 1242 ProfileID string 1243 InstanceID string 1244 Etag string 1245 Description string 1246 RoutingPolicy string 1247 ClusterID string 1248 AllowTransactionalWrites bool 1249 1250 // If true, warnings are ignored 1251 IgnoreWarnings bool 1252} 1253 1254// ProfileIterator iterates over profiles. 1255type ProfileIterator struct { 1256 items []*btapb.AppProfile 1257 pageInfo *iterator.PageInfo 1258 nextFunc func() error 1259} 1260 1261// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced. 1262type ProfileAttrsToUpdate struct { 1263 // If set, updates the description. 1264 Description optional.String 1265 1266 //If set, updates the routing policy. 1267 RoutingPolicy optional.String 1268 1269 //If RoutingPolicy is updated to SingleClusterRouting, set these fields as well. 1270 ClusterID string 1271 AllowTransactionalWrites bool 1272 1273 // If true, warnings are ignored 1274 IgnoreWarnings bool 1275} 1276 1277// GetFieldMaskPath returns the field mask path. 1278func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string { 1279 path := make([]string, 0) 1280 if p.Description != nil { 1281 path = append(path, "description") 1282 } 1283 1284 if p.RoutingPolicy != nil { 1285 path = append(path, optional.ToString(p.RoutingPolicy)) 1286 } 1287 return path 1288} 1289 1290// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 1291func (it *ProfileIterator) PageInfo() *iterator.PageInfo { 1292 return it.pageInfo 1293} 1294 1295// Next returns the next result. Its second return value is iterator.Done 1296// (https://godoc.org/google.golang.org/api/iterator) if there are no more 1297// results. Once Next returns Done, all subsequent calls will return Done. 1298func (it *ProfileIterator) Next() (*btapb.AppProfile, error) { 1299 if err := it.nextFunc(); err != nil { 1300 return nil, err 1301 } 1302 item := it.items[0] 1303 it.items = it.items[1:] 1304 return item, nil 1305} 1306 1307// CreateAppProfile creates an app profile within an instance. 1308func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) { 1309 ctx = mergeOutgoingMetadata(ctx, iac.md) 1310 parent := "projects/" + iac.project + "/instances/" + profile.InstanceID 1311 appProfile := &btapb.AppProfile{ 1312 Etag: profile.Etag, 1313 Description: profile.Description, 1314 } 1315 1316 if profile.RoutingPolicy == "" { 1317 return nil, errors.New("invalid routing policy") 1318 } 1319 1320 switch profile.RoutingPolicy { 1321 case MultiClusterRouting: 1322 appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1323 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1324 } 1325 case SingleClusterRouting: 1326 appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1327 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1328 ClusterId: profile.ClusterID, 1329 AllowTransactionalWrites: profile.AllowTransactionalWrites, 1330 }, 1331 } 1332 default: 1333 return nil, errors.New("invalid routing policy") 1334 } 1335 1336 return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{ 1337 Parent: parent, 1338 AppProfile: appProfile, 1339 AppProfileId: profile.ProfileID, 1340 IgnoreWarnings: profile.IgnoreWarnings, 1341 }) 1342} 1343 1344// GetAppProfile gets information about an app profile. 1345func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) { 1346 ctx = mergeOutgoingMetadata(ctx, iac.md) 1347 profileRequest := &btapb.GetAppProfileRequest{ 1348 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1349 } 1350 var ap *btapb.AppProfile 1351 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1352 var err error 1353 ap, err = iac.iClient.GetAppProfile(ctx, profileRequest) 1354 return err 1355 }, retryOptions...) 1356 if err != nil { 1357 return nil, err 1358 } 1359 return ap, err 1360} 1361 1362// ListAppProfiles lists information about app profiles in an instance. 1363func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator { 1364 ctx = mergeOutgoingMetadata(ctx, iac.md) 1365 listRequest := &btapb.ListAppProfilesRequest{ 1366 Parent: "projects/" + iac.project + "/instances/" + instanceID, 1367 } 1368 1369 pit := &ProfileIterator{} 1370 fetch := func(pageSize int, pageToken string) (string, error) { 1371 listRequest.PageToken = pageToken 1372 var profileRes *btapb.ListAppProfilesResponse 1373 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1374 var err error 1375 profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest) 1376 return err 1377 }, retryOptions...) 1378 if err != nil { 1379 return "", err 1380 } 1381 1382 pit.items = append(pit.items, profileRes.AppProfiles...) 1383 return profileRes.NextPageToken, nil 1384 } 1385 1386 bufLen := func() int { return len(pit.items) } 1387 takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b } 1388 pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 1389 return pit 1390 1391} 1392 1393// UpdateAppProfile updates an app profile within an instance. 1394// updateAttrs should be set. If unset, all fields will be replaced. 1395func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error { 1396 ctx = mergeOutgoingMetadata(ctx, iac.md) 1397 1398 profile := &btapb.AppProfile{ 1399 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID, 1400 } 1401 1402 if updateAttrs.Description != nil { 1403 profile.Description = optional.ToString(updateAttrs.Description) 1404 } 1405 if updateAttrs.RoutingPolicy != nil { 1406 switch optional.ToString(updateAttrs.RoutingPolicy) { 1407 case MultiClusterRouting: 1408 profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1409 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1410 } 1411 case SingleClusterRouting: 1412 profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1413 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1414 ClusterId: updateAttrs.ClusterID, 1415 AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites, 1416 }, 1417 } 1418 default: 1419 return errors.New("invalid routing policy") 1420 } 1421 } 1422 patchRequest := &btapb.UpdateAppProfileRequest{ 1423 AppProfile: profile, 1424 UpdateMask: &field_mask.FieldMask{ 1425 Paths: updateAttrs.GetFieldMaskPath(), 1426 }, 1427 IgnoreWarnings: updateAttrs.IgnoreWarnings, 1428 } 1429 updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest) 1430 if err != nil { 1431 return err 1432 } 1433 1434 return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil) 1435 1436} 1437 1438// DeleteAppProfile deletes an app profile from an instance. 1439func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error { 1440 ctx = mergeOutgoingMetadata(ctx, iac.md) 1441 deleteProfileRequest := &btapb.DeleteAppProfileRequest{ 1442 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1443 IgnoreWarnings: true, 1444 } 1445 _, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest) 1446 return err 1447 1448} 1449 1450// UpdateInstanceResults contains information about the 1451// changes made after invoking UpdateInstanceAndSyncClusters. 1452type UpdateInstanceResults struct { 1453 InstanceUpdated bool 1454 CreatedClusters []string 1455 DeletedClusters []string 1456 UpdatedClusters []string 1457} 1458 1459func (r *UpdateInstanceResults) String() string { 1460 return fmt.Sprintf("Instance updated? %v Clusters added:%v Clusters deleted:%v Clusters updated:%v", 1461 r.InstanceUpdated, r.CreatedClusters, r.DeletedClusters, r.UpdatedClusters) 1462} 1463 1464func max(x, y int) int { 1465 if x > y { 1466 return x 1467 } 1468 return y 1469} 1470 1471// UpdateInstanceAndSyncClusters updates an instance and its clusters, and will synchronize the 1472// clusters in the instance with the provided clusters, creating and deleting them as necessary. 1473// The provided InstanceWithClustersConfig is used as follows: 1474// - InstanceID is required 1475// - DisplayName and InstanceType are updated only if they are not empty 1476// - ClusterID is required for any provided cluster 1477// - Any cluster present in conf.Clusters but not part of the instance will be created using CreateCluster 1478// and the given ClusterConfig. 1479// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance 1480// using DeleteCluster. 1481// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the 1482// provided number of nodes if set. 1483// 1484// This method may return an error after partially succeeding, for example if the instance is updated 1485// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to 1486// determine the current state. The return UpdateInstanceResults will describe the work done by the 1487// method, whether partial or complete. 1488func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient, conf *InstanceWithClustersConfig) (*UpdateInstanceResults, error) { 1489 ctx = mergeOutgoingMetadata(ctx, iac.md) 1490 1491 // First fetch the existing clusters so we know what to remove, add or update. 1492 existingClusters, err := iac.Clusters(ctx, conf.InstanceID) 1493 if err != nil { 1494 return nil, err 1495 } 1496 1497 updatedInstance, err := iac.updateInstance(ctx, conf) 1498 if err != nil { 1499 return nil, err 1500 } 1501 1502 results := &UpdateInstanceResults{InstanceUpdated: updatedInstance} 1503 1504 existingClusterNames := make(map[string]bool) 1505 for _, cluster := range existingClusters { 1506 existingClusterNames[cluster.Name] = true 1507 } 1508 1509 // Synchronize clusters that were passed in with the existing clusters in the instance. 1510 // First update any cluster we encounter that already exists in the instance. 1511 // Collect the clusters that we will create and delete so that we can minimize disruption 1512 // of the instance. 1513 clustersToCreate := list.New() 1514 clustersToDelete := list.New() 1515 for _, cluster := range conf.Clusters { 1516 _, clusterExists := existingClusterNames[cluster.ClusterID] 1517 if !clusterExists { 1518 // The cluster doesn't exist yet, so we must create it. 1519 clustersToCreate.PushBack(cluster) 1520 continue 1521 } 1522 delete(existingClusterNames, cluster.ClusterID) 1523 1524 if cluster.NumNodes <= 0 { 1525 // We only synchronize clusters with a valid number of nodes. 1526 continue 1527 } 1528 1529 // We simply want to update this cluster 1530 err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes) 1531 if err != nil { 1532 return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v", 1533 cluster.ClusterID, err, results) 1534 } 1535 results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID) 1536 } 1537 1538 // Any cluster left in existingClusterNames was NOT in the given config and should be deleted. 1539 for clusterToDelete := range existingClusterNames { 1540 clustersToDelete.PushBack(clusterToDelete) 1541 } 1542 1543 // Now that we have the clusters that we need to create and delete, we do so keeping the following 1544 // in mind: 1545 // - Don't delete the last cluster in the instance, as that will result in an error. 1546 // - Attempt to offset each deletion with a creation before another deletion, so that instance 1547 // capacity is never reduced more than necessary. 1548 // Note that there is a limit on number of clusters in an instance which we are not aware of here, 1549 // so delete a cluster before adding one (as long as there are > 1 clusters left) so that we are 1550 // less likely to exceed the maximum number of clusters. 1551 numExistingClusters := len(existingClusters) 1552 nextCreation := clustersToCreate.Front() 1553 nextDeletion := clustersToDelete.Front() 1554 for { 1555 // We are done when both lists are empty. 1556 if nextCreation == nil && nextDeletion == nil { 1557 break 1558 } 1559 1560 // If there is more than one existing cluster, we always want to delete first if possible. 1561 // If there are no more creations left, always go ahead with the deletion. 1562 if (numExistingClusters > 1 && nextDeletion != nil) || nextCreation == nil { 1563 clusterToDelete := nextDeletion.Value.(string) 1564 err = iac.DeleteCluster(ctx, conf.InstanceID, clusterToDelete) 1565 if err != nil { 1566 return results, fmt.Errorf("DeleteCluster %q failed %v; Progress: %v", 1567 clusterToDelete, err, results) 1568 } 1569 results.DeletedClusters = append(results.DeletedClusters, clusterToDelete) 1570 numExistingClusters-- 1571 nextDeletion = nextDeletion.Next() 1572 } 1573 1574 // Now create a new cluster if required. 1575 if nextCreation != nil { 1576 clusterToCreate := nextCreation.Value.(ClusterConfig) 1577 // Assume the cluster config is well formed and rely on the underlying call to error out. 1578 // Make sure to set the InstanceID, though, since we know what it must be. 1579 clusterToCreate.InstanceID = conf.InstanceID 1580 err = iac.CreateCluster(ctx, &clusterToCreate) 1581 if err != nil { 1582 return results, fmt.Errorf("CreateCluster %v failed %v; Progress: %v", 1583 clusterToCreate, err, results) 1584 } 1585 results.CreatedClusters = append(results.CreatedClusters, clusterToCreate.ClusterID) 1586 numExistingClusters++ 1587 nextCreation = nextCreation.Next() 1588 } 1589 } 1590 1591 return results, nil 1592} 1593 1594// RestoreTable creates a table from a backup. The table will be created in the same cluster as the backup. 1595// To restore a table to a different instance, see RestoreTableFrom. 1596func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error { 1597 return ac.RestoreTableFrom(ctx, ac.instance, table, cluster, backup) 1598} 1599 1600// RestoreTableFrom creates a new table in the admin's instance by restoring from the given backup and instance. 1601// To restore within the same instance, see RestoreTable. 1602// sourceInstance (ex. "my-instance") and sourceCluster (ex. "my-cluster") are the instance and cluster in which the new table will be restored from. 1603// tableName (ex. "my-restored-table") will be the name of the newly created table. 1604// backupName (ex. "my-backup") is the name of the backup to restore. 1605func (ac *AdminClient) RestoreTableFrom(ctx context.Context, sourceInstance, table, sourceCluster, backup string) error { 1606 ctx = mergeOutgoingMetadata(ctx, ac.md) 1607 parent := ac.instancePrefix() 1608 sourceBackupPath := ac.backupPath(sourceCluster, sourceInstance, backup) 1609 req := &btapb.RestoreTableRequest{ 1610 Parent: parent, 1611 TableId: table, 1612 Source: &btapb.RestoreTableRequest_Backup{sourceBackupPath}, 1613 } 1614 op, err := ac.tClient.RestoreTable(ctx, req) 1615 if err != nil { 1616 return err 1617 } 1618 resp := btapb.Table{} 1619 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 1620} 1621 1622// CreateBackup creates a new backup in the specified cluster from the 1623// specified source table with the user-provided expire time. 1624func (ac *AdminClient) CreateBackup(ctx context.Context, table, cluster, backup string, expireTime time.Time) error { 1625 ctx = mergeOutgoingMetadata(ctx, ac.md) 1626 prefix := ac.instancePrefix() 1627 1628 parsedExpireTime, err := ptypes.TimestampProto(expireTime) 1629 if err != nil { 1630 return err 1631 } 1632 1633 req := &btapb.CreateBackupRequest{ 1634 Parent: prefix + "/clusters/" + cluster, 1635 BackupId: backup, 1636 Backup: &btapb.Backup{ 1637 ExpireTime: parsedExpireTime, 1638 SourceTable: prefix + "/tables/" + table, 1639 }, 1640 } 1641 1642 op, err := ac.tClient.CreateBackup(ctx, req) 1643 if err != nil { 1644 return err 1645 } 1646 resp := btapb.Backup{} 1647 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 1648} 1649 1650// Backups returns a BackupIterator for iterating over the backups in a cluster. 1651// To list backups across all of the clusters in the instance specify "-" as the cluster. 1652func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupIterator { 1653 ctx = mergeOutgoingMetadata(ctx, ac.md) 1654 prefix := ac.instancePrefix() 1655 clusterPath := prefix + "/clusters/" + cluster 1656 1657 it := &BackupIterator{} 1658 req := &btapb.ListBackupsRequest{ 1659 Parent: clusterPath, 1660 } 1661 1662 fetch := func(pageSize int, pageToken string) (string, error) { 1663 req.PageToken = pageToken 1664 if pageSize > math.MaxInt32 { 1665 req.PageSize = math.MaxInt32 1666 } else { 1667 req.PageSize = int32(pageSize) 1668 } 1669 1670 var resp *btapb.ListBackupsResponse 1671 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1672 var err error 1673 resp, err = ac.tClient.ListBackups(ctx, req) 1674 return err 1675 }, retryOptions...) 1676 if err != nil { 1677 return "", err 1678 } 1679 for _, s := range resp.Backups { 1680 backupInfo, err := newBackupInfo(s) 1681 if err != nil { 1682 return "", fmt.Errorf("failed to parse backup proto %v", err) 1683 } 1684 it.items = append(it.items, backupInfo) 1685 } 1686 return resp.NextPageToken, nil 1687 } 1688 bufLen := func() int { return len(it.items) } 1689 takeBuf := func() interface{} { b := it.items; it.items = nil; return b } 1690 1691 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 1692 1693 return it 1694} 1695 1696// newBackupInfo creates a BackupInfo struct from a btapb.Backup protocol buffer. 1697func newBackupInfo(backup *btapb.Backup) (*BackupInfo, error) { 1698 nameParts := strings.Split(backup.Name, "/") 1699 name := nameParts[len(nameParts)-1] 1700 tablePathParts := strings.Split(backup.SourceTable, "/") 1701 tableID := tablePathParts[len(tablePathParts)-1] 1702 1703 startTime, err := ptypes.Timestamp(backup.StartTime) 1704 if err != nil { 1705 return nil, fmt.Errorf("invalid startTime: %v", err) 1706 } 1707 1708 endTime, err := ptypes.Timestamp(backup.EndTime) 1709 if err != nil { 1710 return nil, fmt.Errorf("invalid endTime: %v", err) 1711 } 1712 1713 expireTime, err := ptypes.Timestamp(backup.ExpireTime) 1714 if err != nil { 1715 return nil, fmt.Errorf("invalid expireTime: %v", err) 1716 } 1717 encryptionInfo := newEncryptionInfo(backup.EncryptionInfo) 1718 bi := BackupInfo{ 1719 Name: name, 1720 SourceTable: tableID, 1721 SizeBytes: backup.SizeBytes, 1722 StartTime: startTime, 1723 EndTime: endTime, 1724 ExpireTime: expireTime, 1725 State: backup.State.String(), 1726 EncryptionInfo: encryptionInfo, 1727 } 1728 1729 return &bi, nil 1730} 1731 1732// BackupIterator is an EntryIterator that iterates over log entries. 1733type BackupIterator struct { 1734 items []*BackupInfo 1735 pageInfo *iterator.PageInfo 1736 nextFunc func() error 1737} 1738 1739// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 1740func (it *BackupIterator) PageInfo() *iterator.PageInfo { 1741 return it.pageInfo 1742} 1743 1744// Next returns the next result. Its second return value is iterator.Done 1745// (https://godoc.org/google.golang.org/api/iterator) if there are no more 1746// results. Once Next returns Done, all subsequent calls will return Done. 1747func (it *BackupIterator) Next() (*BackupInfo, error) { 1748 if err := it.nextFunc(); err != nil { 1749 return nil, err 1750 } 1751 item := it.items[0] 1752 it.items = it.items[1:] 1753 return item, nil 1754} 1755 1756// BackupInfo contains backup metadata. This struct is read-only. 1757type BackupInfo struct { 1758 Name string 1759 SourceTable string 1760 SizeBytes int64 1761 StartTime time.Time 1762 EndTime time.Time 1763 ExpireTime time.Time 1764 State string 1765 EncryptionInfo *EncryptionInfo 1766} 1767 1768// BackupInfo gets backup metadata. 1769func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) { 1770 ctx = mergeOutgoingMetadata(ctx, ac.md) 1771 backupPath := ac.backupPath(cluster, ac.instance, backup) 1772 1773 req := &btapb.GetBackupRequest{ 1774 Name: backupPath, 1775 } 1776 1777 var resp *btapb.Backup 1778 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 1779 var err error 1780 resp, err = ac.tClient.GetBackup(ctx, req) 1781 return err 1782 }, retryOptions...) 1783 if err != nil { 1784 return nil, err 1785 } 1786 1787 return newBackupInfo(resp) 1788} 1789 1790// DeleteBackup deletes a backup in a cluster. 1791func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error { 1792 ctx = mergeOutgoingMetadata(ctx, ac.md) 1793 backupPath := ac.backupPath(cluster, ac.instance, backup) 1794 1795 req := &btapb.DeleteBackupRequest{ 1796 Name: backupPath, 1797 } 1798 _, err := ac.tClient.DeleteBackup(ctx, req) 1799 return err 1800} 1801 1802// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time. 1803func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error { 1804 ctx = mergeOutgoingMetadata(ctx, ac.md) 1805 backupPath := ac.backupPath(cluster, ac.instance, backup) 1806 1807 expireTimestamp, err := ptypes.TimestampProto(expireTime) 1808 if err != nil { 1809 return err 1810 } 1811 1812 updateMask := &field_mask.FieldMask{} 1813 updateMask.Paths = append(updateMask.Paths, "expire_time") 1814 1815 req := &btapb.UpdateBackupRequest{ 1816 Backup: &btapb.Backup{ 1817 Name: backupPath, 1818 ExpireTime: expireTimestamp, 1819 }, 1820 UpdateMask: updateMask, 1821 } 1822 _, err = ac.tClient.UpdateBackup(ctx, req) 1823 return err 1824} 1825