1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"container/list"
21	"context"
22	"errors"
23	"fmt"
24	"math"
25	"regexp"
26	"strings"
27	"time"
28
29	btopt "cloud.google.com/go/bigtable/internal/option"
30	"cloud.google.com/go/iam"
31	"cloud.google.com/go/internal/optional"
32	"cloud.google.com/go/longrunning"
33	lroauto "cloud.google.com/go/longrunning/autogen"
34	"github.com/golang/protobuf/ptypes"
35	durpb "github.com/golang/protobuf/ptypes/duration"
36	gax "github.com/googleapis/gax-go/v2"
37	"google.golang.org/api/cloudresourcemanager/v1"
38	"google.golang.org/api/iterator"
39	"google.golang.org/api/option"
40	gtransport "google.golang.org/api/transport/grpc"
41	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
42	"google.golang.org/genproto/protobuf/field_mask"
43	"google.golang.org/grpc"
44	"google.golang.org/grpc/codes"
45	"google.golang.org/grpc/metadata"
46	"google.golang.org/grpc/status"
47)
48
49const adminAddr = "bigtableadmin.googleapis.com:443"
50
51// AdminClient is a client type for performing admin operations within a specific instance.
52type AdminClient struct {
53	conn      *grpc.ClientConn
54	tClient   btapb.BigtableTableAdminClient
55	lroClient *lroauto.OperationsClient
56
57	project, instance string
58
59	// Metadata to be sent with each request.
60	md metadata.MD
61}
62
63// NewAdminClient creates a new AdminClient for a given project and instance.
64func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) {
65	o, err := btopt.DefaultClientOptions(adminAddr, AdminScope, clientUserAgent)
66	if err != nil {
67		return nil, err
68	}
69	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
70	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
71	// Need to add scopes for long running operations (for create table & snapshots)
72	o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope))
73	o = append(o, opts...)
74	conn, err := gtransport.Dial(ctx, o...)
75	if err != nil {
76		return nil, fmt.Errorf("dialing: %v", err)
77	}
78
79	lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn))
80	if err != nil {
81		// This error "should not happen", since we are just reusing old connection
82		// and never actually need to dial.
83		// If this does happen, we could leak conn. However, we cannot close conn:
84		// If the user invoked the function with option.WithGRPCConn,
85		// we would close a connection that's still in use.
86		// TODO(pongad): investigate error conditions.
87		return nil, err
88	}
89
90	return &AdminClient{
91		conn:      conn,
92		tClient:   btapb.NewBigtableTableAdminClient(conn),
93		lroClient: lroClient,
94		project:   project,
95		instance:  instance,
96		md:        metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)),
97	}, nil
98}
99
100// Close closes the AdminClient.
101func (ac *AdminClient) Close() error {
102	return ac.conn.Close()
103}
104
105func (ac *AdminClient) instancePrefix() string {
106	return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
107}
108
109// Tables returns a list of the tables in the instance.
110func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
111	ctx = mergeOutgoingMetadata(ctx, ac.md)
112	prefix := ac.instancePrefix()
113	req := &btapb.ListTablesRequest{
114		Parent: prefix,
115	}
116
117	var res *btapb.ListTablesResponse
118	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
119		var err error
120		res, err = ac.tClient.ListTables(ctx, req)
121		return err
122	}, retryOptions...)
123	if err != nil {
124		return nil, err
125	}
126
127	names := make([]string, 0, len(res.Tables))
128	for _, tbl := range res.Tables {
129		names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/"))
130	}
131	return names, nil
132}
133
134// TableConf contains all of the information necessary to create a table with column families.
135type TableConf struct {
136	TableID   string
137	SplitKeys []string
138	// Families is a map from family name to GCPolicy
139	Families map[string]GCPolicy
140}
141
142// CreateTable creates a new table in the instance.
143// This method may return before the table's creation is complete.
144func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
145	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table})
146}
147
148// CreatePresplitTable creates a new table in the instance.
149// The list of row keys will be used to initially split the table into multiple tablets.
150// Given two split keys, "s1" and "s2", three tablets will be created,
151// spanning the key ranges: [, s1), [s1, s2), [s2, ).
152// This method may return before the table's creation is complete.
153func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error {
154	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys})
155}
156
157// CreateTableFromConf creates a new table in the instance from the given configuration.
158func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error {
159	ctx = mergeOutgoingMetadata(ctx, ac.md)
160	var reqSplits []*btapb.CreateTableRequest_Split
161	for _, split := range conf.SplitKeys {
162		reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)})
163	}
164	var tbl btapb.Table
165	if conf.Families != nil {
166		tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
167		for fam, policy := range conf.Families {
168			tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()}
169		}
170	}
171	prefix := ac.instancePrefix()
172	req := &btapb.CreateTableRequest{
173		Parent:        prefix,
174		TableId:       conf.TableID,
175		Table:         &tbl,
176		InitialSplits: reqSplits,
177	}
178	_, err := ac.tClient.CreateTable(ctx, req)
179	return err
180}
181
182// CreateColumnFamily creates a new column family in a table.
183func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error {
184	// TODO(dsymonds): Permit specifying gcexpr and any other family settings.
185	ctx = mergeOutgoingMetadata(ctx, ac.md)
186	prefix := ac.instancePrefix()
187	req := &btapb.ModifyColumnFamiliesRequest{
188		Name: prefix + "/tables/" + table,
189		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
190			Id:  family,
191			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
192		}},
193	}
194	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
195	return err
196}
197
198// DeleteTable deletes a table and all of its data.
199func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
200	ctx = mergeOutgoingMetadata(ctx, ac.md)
201	prefix := ac.instancePrefix()
202	req := &btapb.DeleteTableRequest{
203		Name: prefix + "/tables/" + table,
204	}
205	_, err := ac.tClient.DeleteTable(ctx, req)
206	return err
207}
208
209// DeleteColumnFamily deletes a column family in a table and all of its data.
210func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error {
211	ctx = mergeOutgoingMetadata(ctx, ac.md)
212	prefix := ac.instancePrefix()
213	req := &btapb.ModifyColumnFamiliesRequest{
214		Name: prefix + "/tables/" + table,
215		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
216			Id:  family,
217			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true},
218		}},
219	}
220	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
221	return err
222}
223
224// TableInfo represents information about a table.
225type TableInfo struct {
226	// DEPRECATED - This field is deprecated. Please use FamilyInfos instead.
227	Families    []string
228	FamilyInfos []FamilyInfo
229}
230
231// FamilyInfo represents information about a column family.
232type FamilyInfo struct {
233	Name     string
234	GCPolicy string
235}
236
237// TableInfo retrieves information about a table.
238func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) {
239	ctx = mergeOutgoingMetadata(ctx, ac.md)
240	prefix := ac.instancePrefix()
241	req := &btapb.GetTableRequest{
242		Name: prefix + "/tables/" + table,
243	}
244
245	var res *btapb.Table
246
247	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
248		var err error
249		res, err = ac.tClient.GetTable(ctx, req)
250		return err
251	}, retryOptions...)
252	if err != nil {
253		return nil, err
254	}
255
256	ti := &TableInfo{}
257	for name, fam := range res.ColumnFamilies {
258		ti.Families = append(ti.Families, name)
259		ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)})
260	}
261	return ti, nil
262}
263
264// SetGCPolicy specifies which cells in a column family should be garbage collected.
265// GC executes opportunistically in the background; table reads may return data
266// matching the GC policy.
267func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
268	ctx = mergeOutgoingMetadata(ctx, ac.md)
269	prefix := ac.instancePrefix()
270	req := &btapb.ModifyColumnFamiliesRequest{
271		Name: prefix + "/tables/" + table,
272		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
273			Id:  family,
274			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}},
275		}},
276	}
277	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
278	return err
279}
280
281// DropRowRange permanently deletes a row range from the specified table.
282func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error {
283	ctx = mergeOutgoingMetadata(ctx, ac.md)
284	prefix := ac.instancePrefix()
285	req := &btapb.DropRowRangeRequest{
286		Name:   prefix + "/tables/" + table,
287		Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)},
288	}
289	_, err := ac.tClient.DropRowRange(ctx, req)
290	return err
291}
292
293// DropAllRows permanently deletes all rows from the specified table.
294func (ac *AdminClient) DropAllRows(ctx context.Context, table string) error {
295	ctx = mergeOutgoingMetadata(ctx, ac.md)
296	prefix := ac.instancePrefix()
297	req := &btapb.DropRowRangeRequest{
298		Name:   prefix + "/tables/" + table,
299		Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true},
300	}
301	_, err := ac.tClient.DropRowRange(ctx, req)
302	return err
303}
304
305// CreateTableFromSnapshot creates a table from snapshot.
306// The table will be created in the same cluster as the snapshot.
307//
308// This is a private alpha release of Cloud Bigtable snapshots. This feature
309// is not currently available to most Cloud Bigtable customers. This feature
310// might be changed in backward-incompatible ways and is not recommended for
311// production use. It is not subject to any SLA or deprecation policy.
312func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error {
313	ctx = mergeOutgoingMetadata(ctx, ac.md)
314	prefix := ac.instancePrefix()
315	snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot
316
317	req := &btapb.CreateTableFromSnapshotRequest{
318		Parent:         prefix,
319		TableId:        table,
320		SourceSnapshot: snapshotPath,
321	}
322	op, err := ac.tClient.CreateTableFromSnapshot(ctx, req)
323	if err != nil {
324		return err
325	}
326	resp := btapb.Table{}
327	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
328}
329
330// DefaultSnapshotDuration is the default TTL for a snapshot.
331const DefaultSnapshotDuration time.Duration = 0
332
333// SnapshotTable creates a new snapshot in the specified cluster from the
334// specified source table. Setting the TTL to `DefaultSnapshotDuration` will
335// use the server side default for the duration.
336//
337// This is a private alpha release of Cloud Bigtable snapshots. This feature
338// is not currently available to most Cloud Bigtable customers. This feature
339// might be changed in backward-incompatible ways and is not recommended for
340// production use. It is not subject to any SLA or deprecation policy.
341func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error {
342	ctx = mergeOutgoingMetadata(ctx, ac.md)
343	prefix := ac.instancePrefix()
344
345	var ttlProto *durpb.Duration
346
347	if ttl > 0 {
348		ttlProto = ptypes.DurationProto(ttl)
349	}
350
351	req := &btapb.SnapshotTableRequest{
352		Name:       prefix + "/tables/" + table,
353		Cluster:    prefix + "/clusters/" + cluster,
354		SnapshotId: snapshot,
355		Ttl:        ttlProto,
356	}
357
358	op, err := ac.tClient.SnapshotTable(ctx, req)
359	if err != nil {
360		return err
361	}
362	resp := btapb.Snapshot{}
363	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
364}
365
366// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster.
367// To list snapshots across all of the clusters in the instance specify "-" as the cluster.
368//
369// This is a private alpha release of Cloud Bigtable snapshots. This feature is not
370// currently available to most Cloud Bigtable customers. This feature might be
371// changed in backward-incompatible ways and is not recommended for production use.
372// It is not subject to any SLA or deprecation policy.
373func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator {
374	ctx = mergeOutgoingMetadata(ctx, ac.md)
375	prefix := ac.instancePrefix()
376	clusterPath := prefix + "/clusters/" + cluster
377
378	it := &SnapshotIterator{}
379	req := &btapb.ListSnapshotsRequest{
380		Parent: clusterPath,
381	}
382
383	fetch := func(pageSize int, pageToken string) (string, error) {
384		req.PageToken = pageToken
385		if pageSize > math.MaxInt32 {
386			req.PageSize = math.MaxInt32
387		} else {
388			req.PageSize = int32(pageSize)
389		}
390
391		var resp *btapb.ListSnapshotsResponse
392		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
393			var err error
394			resp, err = ac.tClient.ListSnapshots(ctx, req)
395			return err
396		}, retryOptions...)
397		if err != nil {
398			return "", err
399		}
400		for _, s := range resp.Snapshots {
401			snapshotInfo, err := newSnapshotInfo(s)
402			if err != nil {
403				return "", fmt.Errorf("failed to parse snapshot proto %v", err)
404			}
405			it.items = append(it.items, snapshotInfo)
406		}
407		return resp.NextPageToken, nil
408	}
409	bufLen := func() int { return len(it.items) }
410	takeBuf := func() interface{} { b := it.items; it.items = nil; return b }
411
412	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
413
414	return it
415}
416
417func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) {
418	nameParts := strings.Split(snapshot.Name, "/")
419	name := nameParts[len(nameParts)-1]
420	tablePathParts := strings.Split(snapshot.SourceTable.Name, "/")
421	tableID := tablePathParts[len(tablePathParts)-1]
422
423	createTime, err := ptypes.Timestamp(snapshot.CreateTime)
424	if err != nil {
425		return nil, fmt.Errorf("invalid createTime: %v", err)
426	}
427
428	deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime)
429	if err != nil {
430		return nil, fmt.Errorf("invalid deleteTime: %v", err)
431	}
432
433	return &SnapshotInfo{
434		Name:        name,
435		SourceTable: tableID,
436		DataSize:    snapshot.DataSizeBytes,
437		CreateTime:  createTime,
438		DeleteTime:  deleteTime,
439	}, nil
440}
441
442// SnapshotIterator is an EntryIterator that iterates over log entries.
443//
444// This is a private alpha release of Cloud Bigtable snapshots. This feature
445// is not currently available to most Cloud Bigtable customers. This feature
446// might be changed in backward-incompatible ways and is not recommended for
447// production use. It is not subject to any SLA or deprecation policy.
448type SnapshotIterator struct {
449	items    []*SnapshotInfo
450	pageInfo *iterator.PageInfo
451	nextFunc func() error
452}
453
454// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
455func (it *SnapshotIterator) PageInfo() *iterator.PageInfo {
456	return it.pageInfo
457}
458
459// Next returns the next result. Its second return value is iterator.Done
460// (https://godoc.org/google.golang.org/api/iterator) if there are no more
461// results. Once Next returns Done, all subsequent calls will return Done.
462func (it *SnapshotIterator) Next() (*SnapshotInfo, error) {
463	if err := it.nextFunc(); err != nil {
464		return nil, err
465	}
466	item := it.items[0]
467	it.items = it.items[1:]
468	return item, nil
469}
470
471// SnapshotInfo contains snapshot metadata.
472type SnapshotInfo struct {
473	Name        string
474	SourceTable string
475	DataSize    int64
476	CreateTime  time.Time
477	DeleteTime  time.Time
478}
479
480// SnapshotInfo gets snapshot metadata.
481//
482// This is a private alpha release of Cloud Bigtable snapshots. This feature
483// is not currently available to most Cloud Bigtable customers. This feature
484// might be changed in backward-incompatible ways and is not recommended for
485// production use. It is not subject to any SLA or deprecation policy.
486func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) {
487	ctx = mergeOutgoingMetadata(ctx, ac.md)
488	prefix := ac.instancePrefix()
489	clusterPath := prefix + "/clusters/" + cluster
490	snapshotPath := clusterPath + "/snapshots/" + snapshot
491
492	req := &btapb.GetSnapshotRequest{
493		Name: snapshotPath,
494	}
495
496	var resp *btapb.Snapshot
497	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
498		var err error
499		resp, err = ac.tClient.GetSnapshot(ctx, req)
500		return err
501	}, retryOptions...)
502	if err != nil {
503		return nil, err
504	}
505
506	return newSnapshotInfo(resp)
507}
508
509// DeleteSnapshot deletes a snapshot in a cluster.
510//
511// This is a private alpha release of Cloud Bigtable snapshots. This feature
512// is not currently available to most Cloud Bigtable customers. This feature
513// might be changed in backward-incompatible ways and is not recommended for
514// production use. It is not subject to any SLA or deprecation policy.
515func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error {
516	ctx = mergeOutgoingMetadata(ctx, ac.md)
517	prefix := ac.instancePrefix()
518	clusterPath := prefix + "/clusters/" + cluster
519	snapshotPath := clusterPath + "/snapshots/" + snapshot
520
521	req := &btapb.DeleteSnapshotRequest{
522		Name: snapshotPath,
523	}
524	_, err := ac.tClient.DeleteSnapshot(ctx, req)
525	return err
526}
527
528// getConsistencyToken gets the consistency token for a table.
529func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) {
530	req := &btapb.GenerateConsistencyTokenRequest{
531		Name: tableName,
532	}
533	resp, err := ac.tClient.GenerateConsistencyToken(ctx, req)
534	if err != nil {
535		return "", err
536	}
537	return resp.GetConsistencyToken(), nil
538}
539
540// isConsistent checks if a token is consistent for a table.
541func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) {
542	req := &btapb.CheckConsistencyRequest{
543		Name:             tableName,
544		ConsistencyToken: token,
545	}
546	var resp *btapb.CheckConsistencyResponse
547
548	// Retry calls on retryable errors to avoid losing the token gathered before.
549	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
550		var err error
551		resp, err = ac.tClient.CheckConsistency(ctx, req)
552		return err
553	}, retryOptions...)
554	if err != nil {
555		return false, err
556	}
557	return resp.GetConsistent(), nil
558}
559
560// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication.
561func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error {
562	ctx = mergeOutgoingMetadata(ctx, ac.md)
563	// Get the token.
564	prefix := ac.instancePrefix()
565	tableName := prefix + "/tables/" + table
566	token, err := ac.getConsistencyToken(ctx, tableName)
567	if err != nil {
568		return err
569	}
570
571	// Periodically check if the token is consistent.
572	timer := time.NewTicker(time.Second * 10)
573	defer timer.Stop()
574	for {
575		consistent, err := ac.isConsistent(ctx, tableName, token)
576		if err != nil {
577			return err
578		}
579		if consistent {
580			return nil
581		}
582		// Sleep for a bit or until the ctx is cancelled.
583		select {
584		case <-ctx.Done():
585			return ctx.Err()
586		case <-timer.C:
587		}
588	}
589}
590
591// TableIAM creates an IAM client specific to a given Instance and Table within the configured project.
592func (ac *AdminClient) TableIAM(tableID string) *iam.Handle {
593	return iam.InternalNewHandleGRPCClient(ac.tClient,
594		"projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID)
595}
596
597const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
598
599// InstanceAdminClient is a client type for performing admin operations on instances.
600// These operations can be substantially more dangerous than those provided by AdminClient.
601type InstanceAdminClient struct {
602	conn      *grpc.ClientConn
603	iClient   btapb.BigtableInstanceAdminClient
604	lroClient *lroauto.OperationsClient
605
606	project string
607
608	// Metadata to be sent with each request.
609	md metadata.MD
610}
611
612// NewInstanceAdminClient creates a new InstanceAdminClient for a given project.
613func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) {
614	o, err := btopt.DefaultClientOptions(instanceAdminAddr, InstanceAdminScope, clientUserAgent)
615	if err != nil {
616		return nil, err
617	}
618	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
619	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
620	o = append(o, opts...)
621	conn, err := gtransport.Dial(ctx, o...)
622	if err != nil {
623		return nil, fmt.Errorf("dialing: %v", err)
624	}
625
626	lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn))
627	if err != nil {
628		// This error "should not happen", since we are just reusing old connection
629		// and never actually need to dial.
630		// If this does happen, we could leak conn. However, we cannot close conn:
631		// If the user invoked the function with option.WithGRPCConn,
632		// we would close a connection that's still in use.
633		// TODO(pongad): investigate error conditions.
634		return nil, err
635	}
636
637	return &InstanceAdminClient{
638		conn:      conn,
639		iClient:   btapb.NewBigtableInstanceAdminClient(conn),
640		lroClient: lroClient,
641
642		project: project,
643		md:      metadata.Pairs(resourcePrefixHeader, "projects/"+project),
644	}, nil
645}
646
647// Close closes the InstanceAdminClient.
648func (iac *InstanceAdminClient) Close() error {
649	return iac.conn.Close()
650}
651
652// StorageType is the type of storage used for all tables in an instance
653type StorageType int
654
655const (
656	SSD StorageType = iota
657	HDD
658)
659
660func (st StorageType) proto() btapb.StorageType {
661	if st == HDD {
662		return btapb.StorageType_HDD
663	}
664	return btapb.StorageType_SSD
665}
666
667func storageTypeFromProto(st btapb.StorageType) StorageType {
668	if st == btapb.StorageType_HDD {
669		return HDD
670	}
671
672	return SSD
673}
674
675// InstanceType is the type of the instance
676type InstanceType int32
677
678const (
679	// UNSPECIFIED instance types default to PRODUCTION
680	UNSPECIFIED InstanceType = InstanceType(btapb.Instance_TYPE_UNSPECIFIED)
681	PRODUCTION               = InstanceType(btapb.Instance_PRODUCTION)
682	DEVELOPMENT              = InstanceType(btapb.Instance_DEVELOPMENT)
683)
684
685// InstanceInfo represents information about an instance
686type InstanceInfo struct {
687	Name         string // name of the instance
688	DisplayName  string // display name for UIs
689	InstanceType InstanceType
690}
691
692// InstanceConf contains the information necessary to create an Instance
693type InstanceConf struct {
694	InstanceId, DisplayName, ClusterId, Zone string
695	// NumNodes must not be specified for DEVELOPMENT instance types
696	NumNodes     int32
697	StorageType  StorageType
698	InstanceType InstanceType
699}
700
701// InstanceWithClustersConfig contains the information necessary to create an Instance
702type InstanceWithClustersConfig struct {
703	InstanceID, DisplayName string
704	Clusters                []ClusterConfig
705	InstanceType            InstanceType
706}
707
708var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`)
709
710// CreateInstance creates a new instance in the project.
711// This method will return when the instance has been created or when an error occurs.
712func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
713	ctx = mergeOutgoingMetadata(ctx, iac.md)
714	newConfig := InstanceWithClustersConfig{
715		InstanceID:   conf.InstanceId,
716		DisplayName:  conf.DisplayName,
717		InstanceType: conf.InstanceType,
718		Clusters: []ClusterConfig{
719			{
720				InstanceID:  conf.InstanceId,
721				ClusterID:   conf.ClusterId,
722				Zone:        conf.Zone,
723				NumNodes:    conf.NumNodes,
724				StorageType: conf.StorageType,
725			},
726		},
727	}
728	return iac.CreateInstanceWithClusters(ctx, &newConfig)
729}
730
731// CreateInstanceWithClusters creates a new instance with configured clusters in the project.
732// This method will return when the instance has been created or when an error occurs.
733func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
734	ctx = mergeOutgoingMetadata(ctx, iac.md)
735	clusters := make(map[string]*btapb.Cluster)
736	for _, cluster := range conf.Clusters {
737		clusters[cluster.ClusterID] = cluster.proto(iac.project)
738	}
739
740	req := &btapb.CreateInstanceRequest{
741		Parent:     "projects/" + iac.project,
742		InstanceId: conf.InstanceID,
743		Instance:   &btapb.Instance{DisplayName: conf.DisplayName, Type: btapb.Instance_Type(conf.InstanceType)},
744		Clusters:   clusters,
745	}
746
747	lro, err := iac.iClient.CreateInstance(ctx, req)
748	if err != nil {
749		return err
750	}
751	resp := btapb.Instance{}
752	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
753}
754
755// updateInstance updates a single instance based on config fields that operate
756// at an instance level: DisplayName and InstanceType.
757func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *InstanceWithClustersConfig) (updated bool, err error) {
758	if conf.InstanceID == "" {
759		return false, errors.New("InstanceID is required")
760	}
761
762	// Update the instance, if necessary
763	mask := &field_mask.FieldMask{}
764	ireq := &btapb.PartialUpdateInstanceRequest{
765		Instance: &btapb.Instance{
766			Name: "projects/" + iac.project + "/instances/" + conf.InstanceID,
767		},
768		UpdateMask: mask,
769	}
770	if conf.DisplayName != "" {
771		ireq.Instance.DisplayName = conf.DisplayName
772		mask.Paths = append(mask.Paths, "display_name")
773	}
774	if btapb.Instance_Type(conf.InstanceType) != btapb.Instance_TYPE_UNSPECIFIED {
775		ireq.Instance.Type = btapb.Instance_Type(conf.InstanceType)
776		mask.Paths = append(mask.Paths, "type")
777	}
778
779	if len(mask.Paths) == 0 {
780		return false, nil
781	}
782
783	lro, err := iac.iClient.PartialUpdateInstance(ctx, ireq)
784	if err != nil {
785		return false, err
786	}
787	err = longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
788	if err != nil {
789		return false, err
790	}
791
792	return true, nil
793}
794
795// UpdateInstanceWithClusters updates an instance and its clusters. Updateable
796// fields are instance display name, instance type and cluster size.
797// The provided InstanceWithClustersConfig is used as follows:
798// - InstanceID is required
799// - DisplayName and InstanceType are updated only if they are not empty
800// - ClusterID is required for any provided cluster
801// - All other cluster fields are ignored except for NumNodes, which if set will be updated
802//
803// This method may return an error after partially succeeding, for example if the instance is updated
804// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
805// determine the current state.
806func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
807	ctx = mergeOutgoingMetadata(ctx, iac.md)
808
809	for _, cluster := range conf.Clusters {
810		if cluster.ClusterID == "" {
811			return errors.New("ClusterID is required for every cluster")
812		}
813	}
814
815	updatedInstance, err := iac.updateInstance(ctx, conf)
816	if err != nil {
817		return err
818	}
819
820	// Update any clusters
821	for _, cluster := range conf.Clusters {
822		err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
823		if err != nil {
824			if updatedInstance {
825				// We updated the instance, so note that in the error message.
826				return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded",
827					cluster.ClusterID, err)
828			}
829			return err
830		}
831	}
832
833	return nil
834}
835
836// DeleteInstance deletes an instance from the project.
837func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error {
838	ctx = mergeOutgoingMetadata(ctx, iac.md)
839	req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID}
840	_, err := iac.iClient.DeleteInstance(ctx, req)
841	return err
842}
843
844// Instances returns a list of instances in the project.
845func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) {
846	ctx = mergeOutgoingMetadata(ctx, iac.md)
847	req := &btapb.ListInstancesRequest{
848		Parent: "projects/" + iac.project,
849	}
850	var res *btapb.ListInstancesResponse
851	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
852		var err error
853		res, err = iac.iClient.ListInstances(ctx, req)
854		return err
855	}, retryOptions...)
856	if err != nil {
857		return nil, err
858	}
859	if len(res.FailedLocations) > 0 {
860		// We don't have a good way to return a partial result in the face of some zones being unavailable.
861		// Fail the entire request.
862		return nil, status.Errorf(codes.Unavailable, "Failed locations: %v", res.FailedLocations)
863	}
864
865	var is []*InstanceInfo
866	for _, i := range res.Instances {
867		m := instanceNameRegexp.FindStringSubmatch(i.Name)
868		if m == nil {
869			return nil, fmt.Errorf("malformed instance name %q", i.Name)
870		}
871		is = append(is, &InstanceInfo{
872			Name:         m[2],
873			DisplayName:  i.DisplayName,
874			InstanceType: InstanceType(i.Type),
875		})
876	}
877	return is, nil
878}
879
880// InstanceInfo returns information about an instance.
881func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) {
882	ctx = mergeOutgoingMetadata(ctx, iac.md)
883	req := &btapb.GetInstanceRequest{
884		Name: "projects/" + iac.project + "/instances/" + instanceID,
885	}
886	var res *btapb.Instance
887	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
888		var err error
889		res, err = iac.iClient.GetInstance(ctx, req)
890		return err
891	}, retryOptions...)
892	if err != nil {
893		return nil, err
894	}
895
896	m := instanceNameRegexp.FindStringSubmatch(res.Name)
897	if m == nil {
898		return nil, fmt.Errorf("malformed instance name %q", res.Name)
899	}
900	return &InstanceInfo{
901		Name:         m[2],
902		DisplayName:  res.DisplayName,
903		InstanceType: InstanceType(res.Type),
904	}, nil
905}
906
907// ClusterConfig contains the information necessary to create a cluster
908type ClusterConfig struct {
909	InstanceID, ClusterID, Zone string
910	NumNodes                    int32
911	StorageType                 StorageType
912}
913
914func (cc *ClusterConfig) proto(project string) *btapb.Cluster {
915	return &btapb.Cluster{
916		ServeNodes:         cc.NumNodes,
917		DefaultStorageType: cc.StorageType.proto(),
918		Location:           "projects/" + project + "/locations/" + cc.Zone,
919	}
920}
921
922// ClusterInfo represents information about a cluster.
923type ClusterInfo struct {
924	Name        string      // name of the cluster
925	Zone        string      // GCP zone of the cluster (e.g. "us-central1-a")
926	ServeNodes  int         // number of allocated serve nodes
927	State       string      // state of the cluster
928	StorageType StorageType // the storage type of the cluster
929}
930
931// CreateCluster creates a new cluster in an instance.
932// This method will return when the cluster has been created or when an error occurs.
933func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error {
934	ctx = mergeOutgoingMetadata(ctx, iac.md)
935
936	req := &btapb.CreateClusterRequest{
937		Parent:    "projects/" + iac.project + "/instances/" + conf.InstanceID,
938		ClusterId: conf.ClusterID,
939		Cluster:   conf.proto(iac.project),
940	}
941
942	lro, err := iac.iClient.CreateCluster(ctx, req)
943	if err != nil {
944		return err
945	}
946	resp := btapb.Cluster{}
947	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
948}
949
950// DeleteCluster deletes a cluster from an instance.
951func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error {
952	ctx = mergeOutgoingMetadata(ctx, iac.md)
953	req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
954	_, err := iac.iClient.DeleteCluster(ctx, req)
955	return err
956}
957
958// UpdateCluster updates attributes of a cluster
959func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error {
960	ctx = mergeOutgoingMetadata(ctx, iac.md)
961	cluster := &btapb.Cluster{
962		Name:       "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
963		ServeNodes: serveNodes}
964	lro, err := iac.iClient.UpdateCluster(ctx, cluster)
965	if err != nil {
966		return err
967	}
968	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
969}
970
971// Clusters lists the clusters in an instance.
972func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) {
973	ctx = mergeOutgoingMetadata(ctx, iac.md)
974	req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID}
975	var res *btapb.ListClustersResponse
976	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
977		var err error
978		res, err = iac.iClient.ListClusters(ctx, req)
979		return err
980	}, retryOptions...)
981	if err != nil {
982		return nil, err
983	}
984	// TODO(garyelliott): Deal with failed_locations.
985	var cis []*ClusterInfo
986	for _, c := range res.Clusters {
987		nameParts := strings.Split(c.Name, "/")
988		locParts := strings.Split(c.Location, "/")
989		cis = append(cis, &ClusterInfo{
990			Name:        nameParts[len(nameParts)-1],
991			Zone:        locParts[len(locParts)-1],
992			ServeNodes:  int(c.ServeNodes),
993			State:       c.State.String(),
994			StorageType: storageTypeFromProto(c.DefaultStorageType),
995		})
996	}
997	return cis, nil
998}
999
1000// GetCluster fetches a cluster in an instance
1001func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) {
1002	ctx = mergeOutgoingMetadata(ctx, iac.md)
1003	req := &btapb.GetClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
1004	var c *btapb.Cluster
1005	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1006		var err error
1007		c, err = iac.iClient.GetCluster(ctx, req)
1008		return err
1009	}, retryOptions...)
1010	if err != nil {
1011		return nil, err
1012	}
1013
1014	nameParts := strings.Split(c.Name, "/")
1015	locParts := strings.Split(c.Location, "/")
1016	cis := &ClusterInfo{
1017		Name:        nameParts[len(nameParts)-1],
1018		Zone:        locParts[len(locParts)-1],
1019		ServeNodes:  int(c.ServeNodes),
1020		State:       c.State.String(),
1021		StorageType: storageTypeFromProto(c.DefaultStorageType),
1022	}
1023	return cis, nil
1024}
1025
1026// InstanceIAM returns the instance's IAM handle.
1027func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle {
1028	return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID)
1029}
1030
1031// Routing policies.
1032const (
1033	// MultiClusterRouting is a policy that allows read/write requests to be
1034	// routed to any cluster in the instance. Requests will will fail over to
1035	// another cluster in the event of transient errors or delays. Choosing
1036	// this option sacrifices read-your-writes consistency to improve
1037	// availability.
1038	MultiClusterRouting = "multi_cluster_routing_use_any"
1039	// SingleClusterRouting is a policy that unconditionally routes all
1040	// read/write requests to a specific cluster. This option preserves
1041	// read-your-writes consistency, but does not improve availability.
1042	SingleClusterRouting = "single_cluster_routing"
1043)
1044
1045// ProfileConf contains the information necessary to create an profile
1046type ProfileConf struct {
1047	Name                     string
1048	ProfileID                string
1049	InstanceID               string
1050	Etag                     string
1051	Description              string
1052	RoutingPolicy            string
1053	ClusterID                string
1054	AllowTransactionalWrites bool
1055
1056	// If true, warnings are ignored
1057	IgnoreWarnings bool
1058}
1059
1060// ProfileIterator iterates over profiles.
1061type ProfileIterator struct {
1062	items    []*btapb.AppProfile
1063	pageInfo *iterator.PageInfo
1064	nextFunc func() error
1065}
1066
1067// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced.
1068type ProfileAttrsToUpdate struct {
1069	// If set, updates the description.
1070	Description optional.String
1071
1072	//If set, updates the routing policy.
1073	RoutingPolicy optional.String
1074
1075	//If RoutingPolicy is updated to SingleClusterRouting, set these fields as well.
1076	ClusterID                string
1077	AllowTransactionalWrites bool
1078
1079	// If true, warnings are ignored
1080	IgnoreWarnings bool
1081}
1082
1083// GetFieldMaskPath returns the field mask path.
1084func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string {
1085	path := make([]string, 0)
1086	if p.Description != nil {
1087		path = append(path, "description")
1088	}
1089
1090	if p.RoutingPolicy != nil {
1091		path = append(path, optional.ToString(p.RoutingPolicy))
1092	}
1093	return path
1094}
1095
1096// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
1097func (it *ProfileIterator) PageInfo() *iterator.PageInfo {
1098	return it.pageInfo
1099}
1100
1101// Next returns the next result. Its second return value is iterator.Done
1102// (https://godoc.org/google.golang.org/api/iterator) if there are no more
1103// results. Once Next returns Done, all subsequent calls will return Done.
1104func (it *ProfileIterator) Next() (*btapb.AppProfile, error) {
1105	if err := it.nextFunc(); err != nil {
1106		return nil, err
1107	}
1108	item := it.items[0]
1109	it.items = it.items[1:]
1110	return item, nil
1111}
1112
1113// CreateAppProfile creates an app profile within an instance.
1114func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) {
1115	ctx = mergeOutgoingMetadata(ctx, iac.md)
1116	parent := "projects/" + iac.project + "/instances/" + profile.InstanceID
1117	appProfile := &btapb.AppProfile{
1118		Etag:        profile.Etag,
1119		Description: profile.Description,
1120	}
1121
1122	if profile.RoutingPolicy == "" {
1123		return nil, errors.New("invalid routing policy")
1124	}
1125
1126	switch profile.RoutingPolicy {
1127	case MultiClusterRouting:
1128		appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1129			MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1130		}
1131	case SingleClusterRouting:
1132		appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1133			SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1134				ClusterId:                profile.ClusterID,
1135				AllowTransactionalWrites: profile.AllowTransactionalWrites,
1136			},
1137		}
1138	default:
1139		return nil, errors.New("invalid routing policy")
1140	}
1141
1142	return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{
1143		Parent:         parent,
1144		AppProfile:     appProfile,
1145		AppProfileId:   profile.ProfileID,
1146		IgnoreWarnings: profile.IgnoreWarnings,
1147	})
1148}
1149
1150// GetAppProfile gets information about an app profile.
1151func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) {
1152	ctx = mergeOutgoingMetadata(ctx, iac.md)
1153	profileRequest := &btapb.GetAppProfileRequest{
1154		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1155	}
1156	var ap *btapb.AppProfile
1157	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1158		var err error
1159		ap, err = iac.iClient.GetAppProfile(ctx, profileRequest)
1160		return err
1161	}, retryOptions...)
1162	if err != nil {
1163		return nil, err
1164	}
1165	return ap, err
1166}
1167
1168// ListAppProfiles lists information about app profiles in an instance.
1169func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator {
1170	ctx = mergeOutgoingMetadata(ctx, iac.md)
1171	listRequest := &btapb.ListAppProfilesRequest{
1172		Parent: "projects/" + iac.project + "/instances/" + instanceID,
1173	}
1174
1175	pit := &ProfileIterator{}
1176	fetch := func(pageSize int, pageToken string) (string, error) {
1177		listRequest.PageToken = pageToken
1178		var profileRes *btapb.ListAppProfilesResponse
1179		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1180			var err error
1181			profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest)
1182			return err
1183		}, retryOptions...)
1184		if err != nil {
1185			return "", err
1186		}
1187
1188		pit.items = append(pit.items, profileRes.AppProfiles...)
1189		return profileRes.NextPageToken, nil
1190	}
1191
1192	bufLen := func() int { return len(pit.items) }
1193	takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b }
1194	pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
1195	return pit
1196
1197}
1198
1199// UpdateAppProfile updates an app profile within an instance.
1200// updateAttrs should be set. If unset, all fields will be replaced.
1201func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error {
1202	ctx = mergeOutgoingMetadata(ctx, iac.md)
1203
1204	profile := &btapb.AppProfile{
1205		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID,
1206	}
1207
1208	if updateAttrs.Description != nil {
1209		profile.Description = optional.ToString(updateAttrs.Description)
1210	}
1211	if updateAttrs.RoutingPolicy != nil {
1212		switch optional.ToString(updateAttrs.RoutingPolicy) {
1213		case MultiClusterRouting:
1214			profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1215				MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1216			}
1217		case SingleClusterRouting:
1218			profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1219				SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1220					ClusterId:                updateAttrs.ClusterID,
1221					AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites,
1222				},
1223			}
1224		default:
1225			return errors.New("invalid routing policy")
1226		}
1227	}
1228	patchRequest := &btapb.UpdateAppProfileRequest{
1229		AppProfile: profile,
1230		UpdateMask: &field_mask.FieldMask{
1231			Paths: updateAttrs.GetFieldMaskPath(),
1232		},
1233		IgnoreWarnings: updateAttrs.IgnoreWarnings,
1234	}
1235	updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest)
1236	if err != nil {
1237		return err
1238	}
1239
1240	return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil)
1241
1242}
1243
1244// DeleteAppProfile deletes an app profile from an instance.
1245func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error {
1246	ctx = mergeOutgoingMetadata(ctx, iac.md)
1247	deleteProfileRequest := &btapb.DeleteAppProfileRequest{
1248		Name:           "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1249		IgnoreWarnings: true,
1250	}
1251	_, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest)
1252	return err
1253
1254}
1255
1256// UpdateInstanceResults contains information about the
1257// changes made after invoking UpdateInstanceAndSyncClusters.
1258type UpdateInstanceResults struct {
1259	InstanceUpdated bool
1260	CreatedClusters []string
1261	DeletedClusters []string
1262	UpdatedClusters []string
1263}
1264
1265func (r *UpdateInstanceResults) String() string {
1266	return fmt.Sprintf("Instance updated? %v Clusters added:%v Clusters deleted:%v Clusters updated:%v",
1267		r.InstanceUpdated, r.CreatedClusters, r.DeletedClusters, r.UpdatedClusters)
1268}
1269
1270func max(x, y int) int {
1271	if x > y {
1272		return x
1273	}
1274	return y
1275}
1276
1277// UpdateInstanceAndSyncClusters updates an instance and its clusters, and will synchronize the
1278// clusters in the instance with the provided clusters, creating and deleting them as necessary.
1279// The provided InstanceWithClustersConfig is used as follows:
1280// - InstanceID is required
1281// - DisplayName and InstanceType are updated only if they are not empty
1282// - ClusterID is required for any provided cluster
1283// - Any cluster present in conf.Clusters but not part of the instance will be created using CreateCluster
1284//   and the given ClusterConfig.
1285// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance
1286//   using DeleteCluster.
1287// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the
1288//   provided number of nodes if set.
1289//
1290// This method may return an error after partially succeeding, for example if the instance is updated
1291// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
1292// determine the current state. The return UpdateInstanceResults will describe the work done by the
1293// method, whether partial or complete.
1294func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient, conf *InstanceWithClustersConfig) (*UpdateInstanceResults, error) {
1295	ctx = mergeOutgoingMetadata(ctx, iac.md)
1296
1297	// First fetch the existing clusters so we know what to remove, add or update.
1298	existingClusters, err := iac.Clusters(ctx, conf.InstanceID)
1299	if err != nil {
1300		return nil, err
1301	}
1302
1303	updatedInstance, err := iac.updateInstance(ctx, conf)
1304	if err != nil {
1305		return nil, err
1306	}
1307
1308	results := &UpdateInstanceResults{InstanceUpdated: updatedInstance}
1309
1310	existingClusterNames := make(map[string]bool)
1311	for _, cluster := range existingClusters {
1312		existingClusterNames[cluster.Name] = true
1313	}
1314
1315	// Synchronize clusters that were passed in with the existing clusters in the instance.
1316	// First update any cluster we encounter that already exists in the instance.
1317	// Collect the clusters that we will create and delete so that we can minimize disruption
1318	// of the instance.
1319	clustersToCreate := list.New()
1320	clustersToDelete := list.New()
1321	for _, cluster := range conf.Clusters {
1322		_, clusterExists := existingClusterNames[cluster.ClusterID]
1323		if !clusterExists {
1324			// The cluster doesn't exist yet, so we must create it.
1325			clustersToCreate.PushBack(cluster)
1326			continue
1327		}
1328		delete(existingClusterNames, cluster.ClusterID)
1329
1330		if cluster.NumNodes <= 0 {
1331			// We only synchronize clusters with a valid number of nodes.
1332			continue
1333		}
1334
1335		// We simply want to update this cluster
1336		err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
1337		if err != nil {
1338			return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v",
1339				cluster.ClusterID, err, results)
1340		}
1341		results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID)
1342	}
1343
1344	// Any cluster left in existingClusterNames was NOT in the given config and should be deleted.
1345	for clusterToDelete := range existingClusterNames {
1346		clustersToDelete.PushBack(clusterToDelete)
1347	}
1348
1349	// Now that we have the clusters that we need to create and delete, we do so keeping the following
1350	// in mind:
1351	// - Don't delete the last cluster in the instance, as that will result in an error.
1352	// - Attempt to offset each deletion with a creation before another deletion, so that instance
1353	//   capacity is never reduced more than necessary.
1354	// Note that there is a limit on number of clusters in an instance which we are not aware of here,
1355	// so delete a cluster before adding one (as long as there are > 1 clusters left) so that we are
1356	// less likely to exceed the maximum number of clusters.
1357	numExistingClusters := len(existingClusters)
1358	nextCreation := clustersToCreate.Front()
1359	nextDeletion := clustersToDelete.Front()
1360	for {
1361		// We are done when both lists are empty.
1362		if nextCreation == nil && nextDeletion == nil {
1363			break
1364		}
1365
1366		// If there is more than one existing cluster, we always want to delete first if possible.
1367		// If there are no more creations left, always go ahead with the deletion.
1368		if (numExistingClusters > 1 && nextDeletion != nil) || nextCreation == nil {
1369			clusterToDelete := nextDeletion.Value.(string)
1370			err = iac.DeleteCluster(ctx, conf.InstanceID, clusterToDelete)
1371			if err != nil {
1372				return results, fmt.Errorf("DeleteCluster %q failed %v; Progress: %v",
1373					clusterToDelete, err, results)
1374			}
1375			results.DeletedClusters = append(results.DeletedClusters, clusterToDelete)
1376			numExistingClusters--
1377			nextDeletion = nextDeletion.Next()
1378		}
1379
1380		// Now create a new cluster if required.
1381		if nextCreation != nil {
1382			clusterToCreate := nextCreation.Value.(ClusterConfig)
1383			// Assume the cluster config is well formed and rely on the underlying call to error out.
1384			// Make sure to set the InstanceID, though, since we know what it must be.
1385			clusterToCreate.InstanceID = conf.InstanceID
1386			err = iac.CreateCluster(ctx, &clusterToCreate)
1387			if err != nil {
1388				return results, fmt.Errorf("CreateCluster %v failed %v; Progress: %v",
1389					clusterToCreate, err, results)
1390			}
1391			results.CreatedClusters = append(results.CreatedClusters, clusterToCreate.ClusterID)
1392			numExistingClusters++
1393			nextCreation = nextCreation.Next()
1394		}
1395	}
1396
1397	return results, nil
1398}
1399