1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "math" 24 "regexp" 25 "strings" 26 "time" 27 28 "cloud.google.com/go/bigtable/internal/gax" 29 btopt "cloud.google.com/go/bigtable/internal/option" 30 "cloud.google.com/go/iam" 31 "cloud.google.com/go/internal/optional" 32 "cloud.google.com/go/longrunning" 33 lroauto "cloud.google.com/go/longrunning/autogen" 34 "github.com/golang/protobuf/ptypes" 35 durpb "github.com/golang/protobuf/ptypes/duration" 36 "google.golang.org/api/cloudresourcemanager/v1" 37 "google.golang.org/api/iterator" 38 "google.golang.org/api/option" 39 gtransport "google.golang.org/api/transport/grpc" 40 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 41 "google.golang.org/genproto/protobuf/field_mask" 42 "google.golang.org/grpc" 43 "google.golang.org/grpc/codes" 44 "google.golang.org/grpc/metadata" 45 "google.golang.org/grpc/status" 46) 47 48const adminAddr = "bigtableadmin.googleapis.com:443" 49 50// AdminClient is a client type for performing admin operations within a specific instance. 51type AdminClient struct { 52 conn *grpc.ClientConn 53 tClient btapb.BigtableTableAdminClient 54 lroClient *lroauto.OperationsClient 55 56 project, instance string 57 58 // Metadata to be sent with each request. 59 md metadata.MD 60} 61 62// NewAdminClient creates a new AdminClient for a given project and instance. 63func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) { 64 o, err := btopt.DefaultClientOptions(adminAddr, AdminScope, clientUserAgent) 65 if err != nil { 66 return nil, err 67 } 68 // Need to add scopes for long running operations (for create table & snapshots) 69 o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope)) 70 o = append(o, opts...) 71 conn, err := gtransport.Dial(ctx, o...) 72 if err != nil { 73 return nil, fmt.Errorf("dialing: %v", err) 74 } 75 76 lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn)) 77 if err != nil { 78 // This error "should not happen", since we are just reusing old connection 79 // and never actually need to dial. 80 // If this does happen, we could leak conn. However, we cannot close conn: 81 // If the user invoked the function with option.WithGRPCConn, 82 // we would close a connection that's still in use. 83 // TODO(pongad): investigate error conditions. 84 return nil, err 85 } 86 87 return &AdminClient{ 88 conn: conn, 89 tClient: btapb.NewBigtableTableAdminClient(conn), 90 lroClient: lroClient, 91 project: project, 92 instance: instance, 93 md: metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)), 94 }, nil 95} 96 97// Close closes the AdminClient. 98func (ac *AdminClient) Close() error { 99 return ac.conn.Close() 100} 101 102func (ac *AdminClient) instancePrefix() string { 103 return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance) 104} 105 106// Tables returns a list of the tables in the instance. 107func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { 108 ctx = mergeOutgoingMetadata(ctx, ac.md) 109 prefix := ac.instancePrefix() 110 req := &btapb.ListTablesRequest{ 111 Parent: prefix, 112 } 113 114 var res *btapb.ListTablesResponse 115 err := gax.Invoke(ctx, func(ctx context.Context) error { 116 var err error 117 res, err = ac.tClient.ListTables(ctx, req) 118 return err 119 }, retryOptions...) 120 if err != nil { 121 return nil, err 122 } 123 124 names := make([]string, 0, len(res.Tables)) 125 for _, tbl := range res.Tables { 126 names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/")) 127 } 128 return names, nil 129} 130 131// TableConf contains all of the information necessary to create a table with column families. 132type TableConf struct { 133 TableID string 134 SplitKeys []string 135 // Families is a map from family name to GCPolicy 136 Families map[string]GCPolicy 137} 138 139// CreateTable creates a new table in the instance. 140// This method may return before the table's creation is complete. 141func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { 142 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table}) 143} 144 145// CreatePresplitTable creates a new table in the instance. 146// The list of row keys will be used to initially split the table into multiple tablets. 147// Given two split keys, "s1" and "s2", three tablets will be created, 148// spanning the key ranges: [, s1), [s1, s2), [s2, ). 149// This method may return before the table's creation is complete. 150func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error { 151 return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys}) 152} 153 154// CreateTableFromConf creates a new table in the instance from the given configuration. 155func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error { 156 ctx = mergeOutgoingMetadata(ctx, ac.md) 157 var reqSplits []*btapb.CreateTableRequest_Split 158 for _, split := range conf.SplitKeys { 159 reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)}) 160 } 161 var tbl btapb.Table 162 if conf.Families != nil { 163 tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) 164 for fam, policy := range conf.Families { 165 tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()} 166 } 167 } 168 prefix := ac.instancePrefix() 169 req := &btapb.CreateTableRequest{ 170 Parent: prefix, 171 TableId: conf.TableID, 172 Table: &tbl, 173 InitialSplits: reqSplits, 174 } 175 _, err := ac.tClient.CreateTable(ctx, req) 176 return err 177} 178 179// CreateColumnFamily creates a new column family in a table. 180func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error { 181 // TODO(dsymonds): Permit specifying gcexpr and any other family settings. 182 ctx = mergeOutgoingMetadata(ctx, ac.md) 183 prefix := ac.instancePrefix() 184 req := &btapb.ModifyColumnFamiliesRequest{ 185 Name: prefix + "/tables/" + table, 186 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 187 Id: family, 188 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, 189 }}, 190 } 191 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 192 return err 193} 194 195// DeleteTable deletes a table and all of its data. 196func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error { 197 ctx = mergeOutgoingMetadata(ctx, ac.md) 198 prefix := ac.instancePrefix() 199 req := &btapb.DeleteTableRequest{ 200 Name: prefix + "/tables/" + table, 201 } 202 _, err := ac.tClient.DeleteTable(ctx, req) 203 return err 204} 205 206// DeleteColumnFamily deletes a column family in a table and all of its data. 207func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error { 208 ctx = mergeOutgoingMetadata(ctx, ac.md) 209 prefix := ac.instancePrefix() 210 req := &btapb.ModifyColumnFamiliesRequest{ 211 Name: prefix + "/tables/" + table, 212 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 213 Id: family, 214 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true}, 215 }}, 216 } 217 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 218 return err 219} 220 221// TableInfo represents information about a table. 222type TableInfo struct { 223 // DEPRECATED - This field is deprecated. Please use FamilyInfos instead. 224 Families []string 225 FamilyInfos []FamilyInfo 226} 227 228// FamilyInfo represents information about a column family. 229type FamilyInfo struct { 230 Name string 231 GCPolicy string 232} 233 234// TableInfo retrieves information about a table. 235func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) { 236 ctx = mergeOutgoingMetadata(ctx, ac.md) 237 prefix := ac.instancePrefix() 238 req := &btapb.GetTableRequest{ 239 Name: prefix + "/tables/" + table, 240 } 241 242 var res *btapb.Table 243 244 err := gax.Invoke(ctx, func(ctx context.Context) error { 245 var err error 246 res, err = ac.tClient.GetTable(ctx, req) 247 return err 248 }, retryOptions...) 249 if err != nil { 250 return nil, err 251 } 252 253 ti := &TableInfo{} 254 for name, fam := range res.ColumnFamilies { 255 ti.Families = append(ti.Families, name) 256 ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)}) 257 } 258 return ti, nil 259} 260 261// SetGCPolicy specifies which cells in a column family should be garbage collected. 262// GC executes opportunistically in the background; table reads may return data 263// matching the GC policy. 264func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { 265 ctx = mergeOutgoingMetadata(ctx, ac.md) 266 prefix := ac.instancePrefix() 267 req := &btapb.ModifyColumnFamiliesRequest{ 268 Name: prefix + "/tables/" + table, 269 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 270 Id: family, 271 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}}, 272 }}, 273 } 274 _, err := ac.tClient.ModifyColumnFamilies(ctx, req) 275 return err 276} 277 278// DropRowRange permanently deletes a row range from the specified table. 279func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error { 280 ctx = mergeOutgoingMetadata(ctx, ac.md) 281 prefix := ac.instancePrefix() 282 req := &btapb.DropRowRangeRequest{ 283 Name: prefix + "/tables/" + table, 284 Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)}, 285 } 286 _, err := ac.tClient.DropRowRange(ctx, req) 287 return err 288} 289 290// CreateTableFromSnapshot creates a table from snapshot. 291// The table will be created in the same cluster as the snapshot. 292// 293// This is a private alpha release of Cloud Bigtable snapshots. This feature 294// is not currently available to most Cloud Bigtable customers. This feature 295// might be changed in backward-incompatible ways and is not recommended for 296// production use. It is not subject to any SLA or deprecation policy. 297func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error { 298 ctx = mergeOutgoingMetadata(ctx, ac.md) 299 prefix := ac.instancePrefix() 300 snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot 301 302 req := &btapb.CreateTableFromSnapshotRequest{ 303 Parent: prefix, 304 TableId: table, 305 SourceSnapshot: snapshotPath, 306 } 307 op, err := ac.tClient.CreateTableFromSnapshot(ctx, req) 308 if err != nil { 309 return err 310 } 311 resp := btapb.Table{} 312 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 313} 314 315// DefaultSnapshotDuration is the default TTL for a snapshot. 316const DefaultSnapshotDuration time.Duration = 0 317 318// SnapshotTable creates a new snapshot in the specified cluster from the 319// specified source table. Setting the TTL to `DefaultSnapshotDuration` will 320// use the server side default for the duration. 321// 322// This is a private alpha release of Cloud Bigtable snapshots. This feature 323// is not currently available to most Cloud Bigtable customers. This feature 324// might be changed in backward-incompatible ways and is not recommended for 325// production use. It is not subject to any SLA or deprecation policy. 326func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error { 327 ctx = mergeOutgoingMetadata(ctx, ac.md) 328 prefix := ac.instancePrefix() 329 330 var ttlProto *durpb.Duration 331 332 if ttl > 0 { 333 ttlProto = ptypes.DurationProto(ttl) 334 } 335 336 req := &btapb.SnapshotTableRequest{ 337 Name: prefix + "/tables/" + table, 338 Cluster: prefix + "/clusters/" + cluster, 339 SnapshotId: snapshot, 340 Ttl: ttlProto, 341 } 342 343 op, err := ac.tClient.SnapshotTable(ctx, req) 344 if err != nil { 345 return err 346 } 347 resp := btapb.Snapshot{} 348 return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp) 349} 350 351// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster. 352// To list snapshots across all of the clusters in the instance specify "-" as the cluster. 353// 354// This is a private alpha release of Cloud Bigtable snapshots. This feature is not 355// currently available to most Cloud Bigtable customers. This feature might be 356// changed in backward-incompatible ways and is not recommended for production use. 357// It is not subject to any SLA or deprecation policy. 358func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator { 359 ctx = mergeOutgoingMetadata(ctx, ac.md) 360 prefix := ac.instancePrefix() 361 clusterPath := prefix + "/clusters/" + cluster 362 363 it := &SnapshotIterator{} 364 req := &btapb.ListSnapshotsRequest{ 365 Parent: clusterPath, 366 } 367 368 fetch := func(pageSize int, pageToken string) (string, error) { 369 req.PageToken = pageToken 370 if pageSize > math.MaxInt32 { 371 req.PageSize = math.MaxInt32 372 } else { 373 req.PageSize = int32(pageSize) 374 } 375 376 var resp *btapb.ListSnapshotsResponse 377 err := gax.Invoke(ctx, func(ctx context.Context) error { 378 var err error 379 resp, err = ac.tClient.ListSnapshots(ctx, req) 380 return err 381 }, retryOptions...) 382 if err != nil { 383 return "", err 384 } 385 for _, s := range resp.Snapshots { 386 snapshotInfo, err := newSnapshotInfo(s) 387 if err != nil { 388 return "", fmt.Errorf("failed to parse snapshot proto %v", err) 389 } 390 it.items = append(it.items, snapshotInfo) 391 } 392 return resp.NextPageToken, nil 393 } 394 bufLen := func() int { return len(it.items) } 395 takeBuf := func() interface{} { b := it.items; it.items = nil; return b } 396 397 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 398 399 return it 400} 401 402func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) { 403 nameParts := strings.Split(snapshot.Name, "/") 404 name := nameParts[len(nameParts)-1] 405 tablePathParts := strings.Split(snapshot.SourceTable.Name, "/") 406 tableID := tablePathParts[len(tablePathParts)-1] 407 408 createTime, err := ptypes.Timestamp(snapshot.CreateTime) 409 if err != nil { 410 return nil, fmt.Errorf("invalid createTime: %v", err) 411 } 412 413 deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime) 414 if err != nil { 415 return nil, fmt.Errorf("invalid deleteTime: %v", err) 416 } 417 418 return &SnapshotInfo{ 419 Name: name, 420 SourceTable: tableID, 421 DataSize: snapshot.DataSizeBytes, 422 CreateTime: createTime, 423 DeleteTime: deleteTime, 424 }, nil 425} 426 427// SnapshotIterator is an EntryIterator that iterates over log entries. 428// 429// This is a private alpha release of Cloud Bigtable snapshots. This feature 430// is not currently available to most Cloud Bigtable customers. This feature 431// might be changed in backward-incompatible ways and is not recommended for 432// production use. It is not subject to any SLA or deprecation policy. 433type SnapshotIterator struct { 434 items []*SnapshotInfo 435 pageInfo *iterator.PageInfo 436 nextFunc func() error 437} 438 439// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 440func (it *SnapshotIterator) PageInfo() *iterator.PageInfo { 441 return it.pageInfo 442} 443 444// Next returns the next result. Its second return value is iterator.Done 445// (https://godoc.org/google.golang.org/api/iterator) if there are no more 446// results. Once Next returns Done, all subsequent calls will return Done. 447func (it *SnapshotIterator) Next() (*SnapshotInfo, error) { 448 if err := it.nextFunc(); err != nil { 449 return nil, err 450 } 451 item := it.items[0] 452 it.items = it.items[1:] 453 return item, nil 454} 455 456// SnapshotInfo contains snapshot metadata. 457type SnapshotInfo struct { 458 Name string 459 SourceTable string 460 DataSize int64 461 CreateTime time.Time 462 DeleteTime time.Time 463} 464 465// SnapshotInfo gets snapshot metadata. 466// 467// This is a private alpha release of Cloud Bigtable snapshots. This feature 468// is not currently available to most Cloud Bigtable customers. This feature 469// might be changed in backward-incompatible ways and is not recommended for 470// production use. It is not subject to any SLA or deprecation policy. 471func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) { 472 ctx = mergeOutgoingMetadata(ctx, ac.md) 473 prefix := ac.instancePrefix() 474 clusterPath := prefix + "/clusters/" + cluster 475 snapshotPath := clusterPath + "/snapshots/" + snapshot 476 477 req := &btapb.GetSnapshotRequest{ 478 Name: snapshotPath, 479 } 480 481 var resp *btapb.Snapshot 482 err := gax.Invoke(ctx, func(ctx context.Context) error { 483 var err error 484 resp, err = ac.tClient.GetSnapshot(ctx, req) 485 return err 486 }, retryOptions...) 487 if err != nil { 488 return nil, err 489 } 490 491 return newSnapshotInfo(resp) 492} 493 494// DeleteSnapshot deletes a snapshot in a cluster. 495// 496// This is a private alpha release of Cloud Bigtable snapshots. This feature 497// is not currently available to most Cloud Bigtable customers. This feature 498// might be changed in backward-incompatible ways and is not recommended for 499// production use. It is not subject to any SLA or deprecation policy. 500func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error { 501 ctx = mergeOutgoingMetadata(ctx, ac.md) 502 prefix := ac.instancePrefix() 503 clusterPath := prefix + "/clusters/" + cluster 504 snapshotPath := clusterPath + "/snapshots/" + snapshot 505 506 req := &btapb.DeleteSnapshotRequest{ 507 Name: snapshotPath, 508 } 509 _, err := ac.tClient.DeleteSnapshot(ctx, req) 510 return err 511} 512 513// getConsistencyToken gets the consistency token for a table. 514func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) { 515 req := &btapb.GenerateConsistencyTokenRequest{ 516 Name: tableName, 517 } 518 resp, err := ac.tClient.GenerateConsistencyToken(ctx, req) 519 if err != nil { 520 return "", err 521 } 522 return resp.GetConsistencyToken(), nil 523} 524 525// isConsistent checks if a token is consistent for a table. 526func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) { 527 req := &btapb.CheckConsistencyRequest{ 528 Name: tableName, 529 ConsistencyToken: token, 530 } 531 var resp *btapb.CheckConsistencyResponse 532 533 // Retry calls on retryable errors to avoid losing the token gathered before. 534 err := gax.Invoke(ctx, func(ctx context.Context) error { 535 var err error 536 resp, err = ac.tClient.CheckConsistency(ctx, req) 537 return err 538 }, retryOptions...) 539 if err != nil { 540 return false, err 541 } 542 return resp.GetConsistent(), nil 543} 544 545// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication. 546func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error { 547 // Get the token. 548 prefix := ac.instancePrefix() 549 tableName := prefix + "/tables/" + table 550 token, err := ac.getConsistencyToken(ctx, tableName) 551 if err != nil { 552 return err 553 } 554 555 // Periodically check if the token is consistent. 556 timer := time.NewTicker(time.Second * 10) 557 defer timer.Stop() 558 for { 559 consistent, err := ac.isConsistent(ctx, tableName, token) 560 if err != nil { 561 return err 562 } 563 if consistent { 564 return nil 565 } 566 // Sleep for a bit or until the ctx is cancelled. 567 select { 568 case <-ctx.Done(): 569 return ctx.Err() 570 case <-timer.C: 571 } 572 } 573} 574 575const instanceAdminAddr = "bigtableadmin.googleapis.com:443" 576 577// InstanceAdminClient is a client type for performing admin operations on instances. 578// These operations can be substantially more dangerous than those provided by AdminClient. 579type InstanceAdminClient struct { 580 conn *grpc.ClientConn 581 iClient btapb.BigtableInstanceAdminClient 582 lroClient *lroauto.OperationsClient 583 584 project string 585 586 // Metadata to be sent with each request. 587 md metadata.MD 588} 589 590// NewInstanceAdminClient creates a new InstanceAdminClient for a given project. 591func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) { 592 o, err := btopt.DefaultClientOptions(instanceAdminAddr, InstanceAdminScope, clientUserAgent) 593 if err != nil { 594 return nil, err 595 } 596 o = append(o, opts...) 597 conn, err := gtransport.Dial(ctx, o...) 598 if err != nil { 599 return nil, fmt.Errorf("dialing: %v", err) 600 } 601 602 lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn)) 603 if err != nil { 604 // This error "should not happen", since we are just reusing old connection 605 // and never actually need to dial. 606 // If this does happen, we could leak conn. However, we cannot close conn: 607 // If the user invoked the function with option.WithGRPCConn, 608 // we would close a connection that's still in use. 609 // TODO(pongad): investigate error conditions. 610 return nil, err 611 } 612 613 return &InstanceAdminClient{ 614 conn: conn, 615 iClient: btapb.NewBigtableInstanceAdminClient(conn), 616 lroClient: lroClient, 617 618 project: project, 619 md: metadata.Pairs(resourcePrefixHeader, "projects/"+project), 620 }, nil 621} 622 623// Close closes the InstanceAdminClient. 624func (iac *InstanceAdminClient) Close() error { 625 return iac.conn.Close() 626} 627 628// StorageType is the type of storage used for all tables in an instance 629type StorageType int 630 631const ( 632 SSD StorageType = iota 633 HDD 634) 635 636func (st StorageType) proto() btapb.StorageType { 637 if st == HDD { 638 return btapb.StorageType_HDD 639 } 640 return btapb.StorageType_SSD 641} 642 643// InstanceType is the type of the instance 644type InstanceType int32 645 646const ( 647 PRODUCTION InstanceType = InstanceType(btapb.Instance_PRODUCTION) 648 DEVELOPMENT = InstanceType(btapb.Instance_DEVELOPMENT) 649) 650 651// InstanceInfo represents information about an instance 652type InstanceInfo struct { 653 Name string // name of the instance 654 DisplayName string // display name for UIs 655} 656 657// InstanceConf contains the information necessary to create an Instance 658type InstanceConf struct { 659 InstanceId, DisplayName, ClusterId, Zone string 660 // NumNodes must not be specified for DEVELOPMENT instance types 661 NumNodes int32 662 StorageType StorageType 663 InstanceType InstanceType 664} 665 666// InstanceWithClustersConfig contains the information necessary to create an Instance 667type InstanceWithClustersConfig struct { 668 InstanceID, DisplayName string 669 Clusters []ClusterConfig 670 InstanceType InstanceType 671} 672 673var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`) 674 675// CreateInstance creates a new instance in the project. 676// This method will return when the instance has been created or when an error occurs. 677func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error { 678 newConfig := InstanceWithClustersConfig{ 679 InstanceID: conf.InstanceId, 680 DisplayName: conf.DisplayName, 681 InstanceType: conf.InstanceType, 682 Clusters: []ClusterConfig{ 683 { 684 InstanceID: conf.InstanceId, 685 ClusterID: conf.ClusterId, 686 Zone: conf.Zone, 687 NumNodes: conf.NumNodes, 688 StorageType: conf.StorageType, 689 }, 690 }, 691 } 692 return iac.CreateInstanceWithClusters(ctx, &newConfig) 693} 694 695// CreateInstanceWithClusters creates a new instance with configured clusters in the project. 696// This method will return when the instance has been created or when an error occurs. 697func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error { 698 ctx = mergeOutgoingMetadata(ctx, iac.md) 699 clusters := make(map[string]*btapb.Cluster) 700 for _, cluster := range conf.Clusters { 701 clusters[cluster.ClusterID] = cluster.proto(iac.project) 702 } 703 704 req := &btapb.CreateInstanceRequest{ 705 Parent: "projects/" + iac.project, 706 InstanceId: conf.InstanceID, 707 Instance: &btapb.Instance{DisplayName: conf.DisplayName, Type: btapb.Instance_Type(conf.InstanceType)}, 708 Clusters: clusters, 709 } 710 711 lro, err := iac.iClient.CreateInstance(ctx, req) 712 if err != nil { 713 return err 714 } 715 resp := btapb.Instance{} 716 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 717} 718 719// DeleteInstance deletes an instance from the project. 720func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error { 721 ctx = mergeOutgoingMetadata(ctx, iac.md) 722 req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID} 723 _, err := iac.iClient.DeleteInstance(ctx, req) 724 return err 725} 726 727// Instances returns a list of instances in the project. 728func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) { 729 ctx = mergeOutgoingMetadata(ctx, iac.md) 730 req := &btapb.ListInstancesRequest{ 731 Parent: "projects/" + iac.project, 732 } 733 var res *btapb.ListInstancesResponse 734 err := gax.Invoke(ctx, func(ctx context.Context) error { 735 var err error 736 res, err = iac.iClient.ListInstances(ctx, req) 737 return err 738 }, retryOptions...) 739 if err != nil { 740 return nil, err 741 } 742 if len(res.FailedLocations) > 0 { 743 // We don't have a good way to return a partial result in the face of some zones being unavailable. 744 // Fail the entire request. 745 return nil, status.Errorf(codes.Unavailable, "Failed locations: %v", res.FailedLocations) 746 } 747 748 var is []*InstanceInfo 749 for _, i := range res.Instances { 750 m := instanceNameRegexp.FindStringSubmatch(i.Name) 751 if m == nil { 752 return nil, fmt.Errorf("malformed instance name %q", i.Name) 753 } 754 is = append(is, &InstanceInfo{ 755 Name: m[2], 756 DisplayName: i.DisplayName, 757 }) 758 } 759 return is, nil 760} 761 762// InstanceInfo returns information about an instance. 763func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) { 764 ctx = mergeOutgoingMetadata(ctx, iac.md) 765 req := &btapb.GetInstanceRequest{ 766 Name: "projects/" + iac.project + "/instances/" + instanceID, 767 } 768 var res *btapb.Instance 769 err := gax.Invoke(ctx, func(ctx context.Context) error { 770 var err error 771 res, err = iac.iClient.GetInstance(ctx, req) 772 return err 773 }, retryOptions...) 774 if err != nil { 775 return nil, err 776 } 777 778 m := instanceNameRegexp.FindStringSubmatch(res.Name) 779 if m == nil { 780 return nil, fmt.Errorf("malformed instance name %q", res.Name) 781 } 782 return &InstanceInfo{ 783 Name: m[2], 784 DisplayName: res.DisplayName, 785 }, nil 786} 787 788// ClusterConfig contains the information necessary to create a cluster 789type ClusterConfig struct { 790 InstanceID, ClusterID, Zone string 791 NumNodes int32 792 StorageType StorageType 793} 794 795func (cc *ClusterConfig) proto(project string) *btapb.Cluster { 796 return &btapb.Cluster{ 797 ServeNodes: cc.NumNodes, 798 DefaultStorageType: cc.StorageType.proto(), 799 Location: "projects/" + project + "/locations/" + cc.Zone, 800 } 801} 802 803// ClusterInfo represents information about a cluster. 804type ClusterInfo struct { 805 Name string // name of the cluster 806 Zone string // GCP zone of the cluster (e.g. "us-central1-a") 807 ServeNodes int // number of allocated serve nodes 808 State string // state of the cluster 809} 810 811// CreateCluster creates a new cluster in an instance. 812// This method will return when the cluster has been created or when an error occurs. 813func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error { 814 ctx = mergeOutgoingMetadata(ctx, iac.md) 815 816 req := &btapb.CreateClusterRequest{ 817 Parent: "projects/" + iac.project + "/instances/" + conf.InstanceID, 818 ClusterId: conf.ClusterID, 819 Cluster: conf.proto(iac.project), 820 } 821 822 lro, err := iac.iClient.CreateCluster(ctx, req) 823 if err != nil { 824 return err 825 } 826 resp := btapb.Cluster{} 827 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp) 828} 829 830// DeleteCluster deletes a cluster from an instance. 831func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error { 832 ctx = mergeOutgoingMetadata(ctx, iac.md) 833 req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 834 _, err := iac.iClient.DeleteCluster(ctx, req) 835 return err 836} 837 838// UpdateCluster updates attributes of a cluster 839func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error { 840 ctx = mergeOutgoingMetadata(ctx, iac.md) 841 cluster := &btapb.Cluster{ 842 Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID, 843 ServeNodes: serveNodes} 844 lro, err := iac.iClient.UpdateCluster(ctx, cluster) 845 if err != nil { 846 return err 847 } 848 return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil) 849} 850 851// Clusters lists the clusters in an instance. 852func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) { 853 ctx = mergeOutgoingMetadata(ctx, iac.md) 854 req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID} 855 var res *btapb.ListClustersResponse 856 err := gax.Invoke(ctx, func(ctx context.Context) error { 857 var err error 858 res, err = iac.iClient.ListClusters(ctx, req) 859 return err 860 }, retryOptions...) 861 if err != nil { 862 return nil, err 863 } 864 // TODO(garyelliott): Deal with failed_locations. 865 var cis []*ClusterInfo 866 for _, c := range res.Clusters { 867 nameParts := strings.Split(c.Name, "/") 868 locParts := strings.Split(c.Location, "/") 869 cis = append(cis, &ClusterInfo{ 870 Name: nameParts[len(nameParts)-1], 871 Zone: locParts[len(locParts)-1], 872 ServeNodes: int(c.ServeNodes), 873 State: c.State.String(), 874 }) 875 } 876 return cis, nil 877} 878 879// GetCluster fetches a cluster in an instance 880func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) { 881 ctx = mergeOutgoingMetadata(ctx, iac.md) 882 req := &btapb.GetClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID} 883 var c *btapb.Cluster 884 err := gax.Invoke(ctx, func(ctx context.Context) error { 885 var err error 886 c, err = iac.iClient.GetCluster(ctx, req) 887 return err 888 }, retryOptions...) 889 if err != nil { 890 return nil, err 891 } 892 893 nameParts := strings.Split(c.Name, "/") 894 locParts := strings.Split(c.Location, "/") 895 cis := &ClusterInfo{ 896 Name: nameParts[len(nameParts)-1], 897 Zone: locParts[len(locParts)-1], 898 ServeNodes: int(c.ServeNodes), 899 State: c.State.String(), 900 } 901 return cis, nil 902} 903 904// InstanceIAM returns the instance's IAM handle. 905func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle { 906 return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID) 907 908} 909 910// Routing policies. 911const ( 912 // MultiClusterRouting is a policy that allows read/write requests to be 913 // routed to any cluster in the instance. Requests will will fail over to 914 // another cluster in the event of transient errors or delays. Choosing 915 // this option sacrifices read-your-writes consistency to improve 916 // availability. 917 MultiClusterRouting = "multi_cluster_routing_use_any" 918 // SingleClusterRouting is a policy that unconditionally routes all 919 // read/write requests to a specific cluster. This option preserves 920 // read-your-writes consistency, but does not improve availability. 921 SingleClusterRouting = "single_cluster_routing" 922) 923 924// ProfileConf contains the information necessary to create an profile 925type ProfileConf struct { 926 Name string 927 ProfileID string 928 InstanceID string 929 Etag string 930 Description string 931 RoutingPolicy string 932 ClusterID string 933 AllowTransactionalWrites bool 934 935 // If true, warnings are ignored 936 IgnoreWarnings bool 937} 938 939// ProfileIterator iterates over profiles. 940type ProfileIterator struct { 941 items []*btapb.AppProfile 942 pageInfo *iterator.PageInfo 943 nextFunc func() error 944} 945 946// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced. 947type ProfileAttrsToUpdate struct { 948 // If set, updates the description. 949 Description optional.String 950 951 //If set, updates the routing policy. 952 RoutingPolicy optional.String 953 954 //If RoutingPolicy is updated to SingleClusterRouting, set these fields as well. 955 ClusterID string 956 AllowTransactionalWrites bool 957 958 // If true, warnings are ignored 959 IgnoreWarnings bool 960} 961 962// GetFieldMaskPath returns the field mask path. 963func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string { 964 path := make([]string, 0) 965 if p.Description != nil { 966 path = append(path, "description") 967 } 968 969 if p.RoutingPolicy != nil { 970 path = append(path, optional.ToString(p.RoutingPolicy)) 971 } 972 return path 973} 974 975// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details. 976func (it *ProfileIterator) PageInfo() *iterator.PageInfo { 977 return it.pageInfo 978} 979 980// Next returns the next result. Its second return value is iterator.Done 981// (https://godoc.org/google.golang.org/api/iterator) if there are no more 982// results. Once Next returns Done, all subsequent calls will return Done. 983func (it *ProfileIterator) Next() (*btapb.AppProfile, error) { 984 if err := it.nextFunc(); err != nil { 985 return nil, err 986 } 987 item := it.items[0] 988 it.items = it.items[1:] 989 return item, nil 990} 991 992// CreateAppProfile creates an app profile within an instance. 993func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) { 994 ctx = mergeOutgoingMetadata(ctx, iac.md) 995 parent := "projects/" + iac.project + "/instances/" + profile.InstanceID 996 appProfile := &btapb.AppProfile{ 997 Etag: profile.Etag, 998 Description: profile.Description, 999 } 1000 1001 if profile.RoutingPolicy == "" { 1002 return nil, errors.New("invalid routing policy") 1003 } 1004 1005 switch profile.RoutingPolicy { 1006 case MultiClusterRouting: 1007 appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1008 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1009 } 1010 case SingleClusterRouting: 1011 appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1012 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1013 ClusterId: profile.ClusterID, 1014 AllowTransactionalWrites: profile.AllowTransactionalWrites, 1015 }, 1016 } 1017 default: 1018 return nil, errors.New("invalid routing policy") 1019 } 1020 1021 return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{ 1022 Parent: parent, 1023 AppProfile: appProfile, 1024 AppProfileId: profile.ProfileID, 1025 IgnoreWarnings: profile.IgnoreWarnings, 1026 }) 1027} 1028 1029// GetAppProfile gets information about an app profile. 1030func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) { 1031 ctx = mergeOutgoingMetadata(ctx, iac.md) 1032 profileRequest := &btapb.GetAppProfileRequest{ 1033 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1034 } 1035 var ap *btapb.AppProfile 1036 err := gax.Invoke(ctx, func(ctx context.Context) error { 1037 var err error 1038 ap, err = iac.iClient.GetAppProfile(ctx, profileRequest) 1039 return err 1040 }, retryOptions...) 1041 if err != nil { 1042 return nil, err 1043 } 1044 return ap, err 1045} 1046 1047// ListAppProfiles lists information about app profiles in an instance. 1048func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator { 1049 ctx = mergeOutgoingMetadata(ctx, iac.md) 1050 listRequest := &btapb.ListAppProfilesRequest{ 1051 Parent: "projects/" + iac.project + "/instances/" + instanceID, 1052 } 1053 1054 pit := &ProfileIterator{} 1055 fetch := func(pageSize int, pageToken string) (string, error) { 1056 listRequest.PageToken = pageToken 1057 var profileRes *btapb.ListAppProfilesResponse 1058 err := gax.Invoke(ctx, func(ctx context.Context) error { 1059 var err error 1060 profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest) 1061 return err 1062 }, retryOptions...) 1063 if err != nil { 1064 return "", err 1065 } 1066 1067 pit.items = append(pit.items, profileRes.AppProfiles...) 1068 return profileRes.NextPageToken, nil 1069 } 1070 1071 bufLen := func() int { return len(pit.items) } 1072 takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b } 1073 pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf) 1074 return pit 1075 1076} 1077 1078// UpdateAppProfile updates an app profile within an instance. 1079// updateAttrs should be set. If unset, all fields will be replaced. 1080func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error { 1081 ctx = mergeOutgoingMetadata(ctx, iac.md) 1082 1083 profile := &btapb.AppProfile{ 1084 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID, 1085 } 1086 1087 if updateAttrs.Description != nil { 1088 profile.Description = optional.ToString(updateAttrs.Description) 1089 } 1090 if updateAttrs.RoutingPolicy != nil { 1091 switch optional.ToString(updateAttrs.RoutingPolicy) { 1092 case MultiClusterRouting: 1093 profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{ 1094 MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{}, 1095 } 1096 case SingleClusterRouting: 1097 profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{ 1098 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1099 ClusterId: updateAttrs.ClusterID, 1100 AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites, 1101 }, 1102 } 1103 default: 1104 return errors.New("invalid routing policy") 1105 } 1106 } 1107 patchRequest := &btapb.UpdateAppProfileRequest{ 1108 AppProfile: profile, 1109 UpdateMask: &field_mask.FieldMask{ 1110 Paths: updateAttrs.GetFieldMaskPath(), 1111 }, 1112 IgnoreWarnings: updateAttrs.IgnoreWarnings, 1113 } 1114 updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest) 1115 if err != nil { 1116 return err 1117 } 1118 1119 return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil) 1120 1121} 1122 1123// DeleteAppProfile deletes an app profile from an instance. 1124func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error { 1125 ctx = mergeOutgoingMetadata(ctx, iac.md) 1126 deleteProfileRequest := &btapb.DeleteAppProfileRequest{ 1127 Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name, 1128 IgnoreWarnings: true, 1129 } 1130 _, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest) 1131 return err 1132 1133} 1134