1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"container/list"
21	"context"
22	"errors"
23	"fmt"
24	"math"
25	"regexp"
26	"strings"
27	"time"
28
29	btopt "cloud.google.com/go/bigtable/internal/option"
30	"cloud.google.com/go/iam"
31	"cloud.google.com/go/internal/optional"
32	"cloud.google.com/go/longrunning"
33	lroauto "cloud.google.com/go/longrunning/autogen"
34	"github.com/golang/protobuf/ptypes"
35	durpb "github.com/golang/protobuf/ptypes/duration"
36	gax "github.com/googleapis/gax-go/v2"
37	"google.golang.org/api/cloudresourcemanager/v1"
38	"google.golang.org/api/iterator"
39	"google.golang.org/api/option"
40	gtransport "google.golang.org/api/transport/grpc"
41	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
42	"google.golang.org/genproto/protobuf/field_mask"
43	"google.golang.org/grpc/codes"
44	"google.golang.org/grpc/metadata"
45	"google.golang.org/grpc/status"
46)
47
48const adminAddr = "bigtableadmin.googleapis.com:443"
49
50// AdminClient is a client type for performing admin operations within a specific instance.
51type AdminClient struct {
52	connPool  gtransport.ConnPool
53	tClient   btapb.BigtableTableAdminClient
54	lroClient *lroauto.OperationsClient
55
56	project, instance string
57
58	// Metadata to be sent with each request.
59	md metadata.MD
60}
61
62// NewAdminClient creates a new AdminClient for a given project and instance.
63func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) {
64	o, err := btopt.DefaultClientOptions(adminAddr, AdminScope, clientUserAgent)
65	if err != nil {
66		return nil, err
67	}
68	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
69	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
70	// Need to add scopes for long running operations (for create table & snapshots)
71	o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope))
72	o = append(o, opts...)
73	connPool, err := gtransport.DialPool(ctx, o...)
74	if err != nil {
75		return nil, fmt.Errorf("dialing: %v", err)
76	}
77
78	lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool))
79	if err != nil {
80		// This error "should not happen", since we are just reusing old connection
81		// and never actually need to dial.
82		// If this does happen, we could leak conn. However, we cannot close conn:
83		// If the user invoked the function with option.WithGRPCConn,
84		// we would close a connection that's still in use.
85		// TODO(pongad): investigate error conditions.
86		return nil, err
87	}
88
89	return &AdminClient{
90		connPool:  connPool,
91		tClient:   btapb.NewBigtableTableAdminClient(connPool),
92		lroClient: lroClient,
93		project:   project,
94		instance:  instance,
95		md:        metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)),
96	}, nil
97}
98
99// Close closes the AdminClient.
100func (ac *AdminClient) Close() error {
101	return ac.connPool.Close()
102}
103
104func (ac *AdminClient) instancePrefix() string {
105	return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
106}
107
108// Tables returns a list of the tables in the instance.
109func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
110	ctx = mergeOutgoingMetadata(ctx, ac.md)
111	prefix := ac.instancePrefix()
112	req := &btapb.ListTablesRequest{
113		Parent: prefix,
114	}
115
116	var res *btapb.ListTablesResponse
117	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
118		var err error
119		res, err = ac.tClient.ListTables(ctx, req)
120		return err
121	}, retryOptions...)
122	if err != nil {
123		return nil, err
124	}
125
126	names := make([]string, 0, len(res.Tables))
127	for _, tbl := range res.Tables {
128		names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/"))
129	}
130	return names, nil
131}
132
133// TableConf contains all of the information necessary to create a table with column families.
134type TableConf struct {
135	TableID   string
136	SplitKeys []string
137	// Families is a map from family name to GCPolicy
138	Families map[string]GCPolicy
139}
140
141// CreateTable creates a new table in the instance.
142// This method may return before the table's creation is complete.
143func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
144	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table})
145}
146
147// CreatePresplitTable creates a new table in the instance.
148// The list of row keys will be used to initially split the table into multiple tablets.
149// Given two split keys, "s1" and "s2", three tablets will be created,
150// spanning the key ranges: [, s1), [s1, s2), [s2, ).
151// This method may return before the table's creation is complete.
152func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error {
153	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys})
154}
155
156// CreateTableFromConf creates a new table in the instance from the given configuration.
157func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error {
158	ctx = mergeOutgoingMetadata(ctx, ac.md)
159	var reqSplits []*btapb.CreateTableRequest_Split
160	for _, split := range conf.SplitKeys {
161		reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)})
162	}
163	var tbl btapb.Table
164	if conf.Families != nil {
165		tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
166		for fam, policy := range conf.Families {
167			tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()}
168		}
169	}
170	prefix := ac.instancePrefix()
171	req := &btapb.CreateTableRequest{
172		Parent:        prefix,
173		TableId:       conf.TableID,
174		Table:         &tbl,
175		InitialSplits: reqSplits,
176	}
177	_, err := ac.tClient.CreateTable(ctx, req)
178	return err
179}
180
181// CreateColumnFamily creates a new column family in a table.
182func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error {
183	// TODO(dsymonds): Permit specifying gcexpr and any other family settings.
184	ctx = mergeOutgoingMetadata(ctx, ac.md)
185	prefix := ac.instancePrefix()
186	req := &btapb.ModifyColumnFamiliesRequest{
187		Name: prefix + "/tables/" + table,
188		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
189			Id:  family,
190			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
191		}},
192	}
193	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
194	return err
195}
196
197// DeleteTable deletes a table and all of its data.
198func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
199	ctx = mergeOutgoingMetadata(ctx, ac.md)
200	prefix := ac.instancePrefix()
201	req := &btapb.DeleteTableRequest{
202		Name: prefix + "/tables/" + table,
203	}
204	_, err := ac.tClient.DeleteTable(ctx, req)
205	return err
206}
207
208// DeleteColumnFamily deletes a column family in a table and all of its data.
209func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error {
210	ctx = mergeOutgoingMetadata(ctx, ac.md)
211	prefix := ac.instancePrefix()
212	req := &btapb.ModifyColumnFamiliesRequest{
213		Name: prefix + "/tables/" + table,
214		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
215			Id:  family,
216			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true},
217		}},
218	}
219	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
220	return err
221}
222
223// TableInfo represents information about a table.
224type TableInfo struct {
225	// DEPRECATED - This field is deprecated. Please use FamilyInfos instead.
226	Families    []string
227	FamilyInfos []FamilyInfo
228}
229
230// FamilyInfo represents information about a column family.
231type FamilyInfo struct {
232	Name     string
233	GCPolicy string
234}
235
236// TableInfo retrieves information about a table.
237func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) {
238	ctx = mergeOutgoingMetadata(ctx, ac.md)
239	prefix := ac.instancePrefix()
240	req := &btapb.GetTableRequest{
241		Name: prefix + "/tables/" + table,
242	}
243
244	var res *btapb.Table
245
246	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
247		var err error
248		res, err = ac.tClient.GetTable(ctx, req)
249		return err
250	}, retryOptions...)
251	if err != nil {
252		return nil, err
253	}
254
255	ti := &TableInfo{}
256	for name, fam := range res.ColumnFamilies {
257		ti.Families = append(ti.Families, name)
258		ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)})
259	}
260	return ti, nil
261}
262
263// SetGCPolicy specifies which cells in a column family should be garbage collected.
264// GC executes opportunistically in the background; table reads may return data
265// matching the GC policy.
266func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
267	ctx = mergeOutgoingMetadata(ctx, ac.md)
268	prefix := ac.instancePrefix()
269	req := &btapb.ModifyColumnFamiliesRequest{
270		Name: prefix + "/tables/" + table,
271		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
272			Id:  family,
273			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}},
274		}},
275	}
276	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
277	return err
278}
279
280// DropRowRange permanently deletes a row range from the specified table.
281func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error {
282	ctx = mergeOutgoingMetadata(ctx, ac.md)
283	prefix := ac.instancePrefix()
284	req := &btapb.DropRowRangeRequest{
285		Name:   prefix + "/tables/" + table,
286		Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)},
287	}
288	_, err := ac.tClient.DropRowRange(ctx, req)
289	return err
290}
291
292// DropAllRows permanently deletes all rows from the specified table.
293func (ac *AdminClient) DropAllRows(ctx context.Context, table string) error {
294	ctx = mergeOutgoingMetadata(ctx, ac.md)
295	prefix := ac.instancePrefix()
296	req := &btapb.DropRowRangeRequest{
297		Name:   prefix + "/tables/" + table,
298		Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true},
299	}
300	_, err := ac.tClient.DropRowRange(ctx, req)
301	return err
302}
303
304// CreateTableFromSnapshot creates a table from snapshot.
305// The table will be created in the same cluster as the snapshot.
306//
307// This is a private alpha release of Cloud Bigtable snapshots. This feature
308// is not currently available to most Cloud Bigtable customers. This feature
309// might be changed in backward-incompatible ways and is not recommended for
310// production use. It is not subject to any SLA or deprecation policy.
311func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error {
312	ctx = mergeOutgoingMetadata(ctx, ac.md)
313	prefix := ac.instancePrefix()
314	snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot
315
316	req := &btapb.CreateTableFromSnapshotRequest{
317		Parent:         prefix,
318		TableId:        table,
319		SourceSnapshot: snapshotPath,
320	}
321	op, err := ac.tClient.CreateTableFromSnapshot(ctx, req)
322	if err != nil {
323		return err
324	}
325	resp := btapb.Table{}
326	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
327}
328
329// DefaultSnapshotDuration is the default TTL for a snapshot.
330const DefaultSnapshotDuration time.Duration = 0
331
332// SnapshotTable creates a new snapshot in the specified cluster from the
333// specified source table. Setting the TTL to `DefaultSnapshotDuration` will
334// use the server side default for the duration.
335//
336// This is a private alpha release of Cloud Bigtable snapshots. This feature
337// is not currently available to most Cloud Bigtable customers. This feature
338// might be changed in backward-incompatible ways and is not recommended for
339// production use. It is not subject to any SLA or deprecation policy.
340func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error {
341	ctx = mergeOutgoingMetadata(ctx, ac.md)
342	prefix := ac.instancePrefix()
343
344	var ttlProto *durpb.Duration
345
346	if ttl > 0 {
347		ttlProto = ptypes.DurationProto(ttl)
348	}
349
350	req := &btapb.SnapshotTableRequest{
351		Name:       prefix + "/tables/" + table,
352		Cluster:    prefix + "/clusters/" + cluster,
353		SnapshotId: snapshot,
354		Ttl:        ttlProto,
355	}
356
357	op, err := ac.tClient.SnapshotTable(ctx, req)
358	if err != nil {
359		return err
360	}
361	resp := btapb.Snapshot{}
362	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
363}
364
365// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster.
366// To list snapshots across all of the clusters in the instance specify "-" as the cluster.
367//
368// This is a private alpha release of Cloud Bigtable snapshots. This feature is not
369// currently available to most Cloud Bigtable customers. This feature might be
370// changed in backward-incompatible ways and is not recommended for production use.
371// It is not subject to any SLA or deprecation policy.
372func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator {
373	ctx = mergeOutgoingMetadata(ctx, ac.md)
374	prefix := ac.instancePrefix()
375	clusterPath := prefix + "/clusters/" + cluster
376
377	it := &SnapshotIterator{}
378	req := &btapb.ListSnapshotsRequest{
379		Parent: clusterPath,
380	}
381
382	fetch := func(pageSize int, pageToken string) (string, error) {
383		req.PageToken = pageToken
384		if pageSize > math.MaxInt32 {
385			req.PageSize = math.MaxInt32
386		} else {
387			req.PageSize = int32(pageSize)
388		}
389
390		var resp *btapb.ListSnapshotsResponse
391		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
392			var err error
393			resp, err = ac.tClient.ListSnapshots(ctx, req)
394			return err
395		}, retryOptions...)
396		if err != nil {
397			return "", err
398		}
399		for _, s := range resp.Snapshots {
400			snapshotInfo, err := newSnapshotInfo(s)
401			if err != nil {
402				return "", fmt.Errorf("failed to parse snapshot proto %v", err)
403			}
404			it.items = append(it.items, snapshotInfo)
405		}
406		return resp.NextPageToken, nil
407	}
408	bufLen := func() int { return len(it.items) }
409	takeBuf := func() interface{} { b := it.items; it.items = nil; return b }
410
411	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
412
413	return it
414}
415
416func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) {
417	nameParts := strings.Split(snapshot.Name, "/")
418	name := nameParts[len(nameParts)-1]
419	tablePathParts := strings.Split(snapshot.SourceTable.Name, "/")
420	tableID := tablePathParts[len(tablePathParts)-1]
421
422	createTime, err := ptypes.Timestamp(snapshot.CreateTime)
423	if err != nil {
424		return nil, fmt.Errorf("invalid createTime: %v", err)
425	}
426
427	deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime)
428	if err != nil {
429		return nil, fmt.Errorf("invalid deleteTime: %v", err)
430	}
431
432	return &SnapshotInfo{
433		Name:        name,
434		SourceTable: tableID,
435		DataSize:    snapshot.DataSizeBytes,
436		CreateTime:  createTime,
437		DeleteTime:  deleteTime,
438	}, nil
439}
440
441// SnapshotIterator is an EntryIterator that iterates over log entries.
442//
443// This is a private alpha release of Cloud Bigtable snapshots. This feature
444// is not currently available to most Cloud Bigtable customers. This feature
445// might be changed in backward-incompatible ways and is not recommended for
446// production use. It is not subject to any SLA or deprecation policy.
447type SnapshotIterator struct {
448	items    []*SnapshotInfo
449	pageInfo *iterator.PageInfo
450	nextFunc func() error
451}
452
453// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
454func (it *SnapshotIterator) PageInfo() *iterator.PageInfo {
455	return it.pageInfo
456}
457
458// Next returns the next result. Its second return value is iterator.Done
459// (https://godoc.org/google.golang.org/api/iterator) if there are no more
460// results. Once Next returns Done, all subsequent calls will return Done.
461func (it *SnapshotIterator) Next() (*SnapshotInfo, error) {
462	if err := it.nextFunc(); err != nil {
463		return nil, err
464	}
465	item := it.items[0]
466	it.items = it.items[1:]
467	return item, nil
468}
469
470// SnapshotInfo contains snapshot metadata.
471type SnapshotInfo struct {
472	Name        string
473	SourceTable string
474	DataSize    int64
475	CreateTime  time.Time
476	DeleteTime  time.Time
477}
478
479// SnapshotInfo gets snapshot metadata.
480//
481// This is a private alpha release of Cloud Bigtable snapshots. This feature
482// is not currently available to most Cloud Bigtable customers. This feature
483// might be changed in backward-incompatible ways and is not recommended for
484// production use. It is not subject to any SLA or deprecation policy.
485func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) {
486	ctx = mergeOutgoingMetadata(ctx, ac.md)
487	prefix := ac.instancePrefix()
488	clusterPath := prefix + "/clusters/" + cluster
489	snapshotPath := clusterPath + "/snapshots/" + snapshot
490
491	req := &btapb.GetSnapshotRequest{
492		Name: snapshotPath,
493	}
494
495	var resp *btapb.Snapshot
496	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
497		var err error
498		resp, err = ac.tClient.GetSnapshot(ctx, req)
499		return err
500	}, retryOptions...)
501	if err != nil {
502		return nil, err
503	}
504
505	return newSnapshotInfo(resp)
506}
507
508// DeleteSnapshot deletes a snapshot in a cluster.
509//
510// This is a private alpha release of Cloud Bigtable snapshots. This feature
511// is not currently available to most Cloud Bigtable customers. This feature
512// might be changed in backward-incompatible ways and is not recommended for
513// production use. It is not subject to any SLA or deprecation policy.
514func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error {
515	ctx = mergeOutgoingMetadata(ctx, ac.md)
516	prefix := ac.instancePrefix()
517	clusterPath := prefix + "/clusters/" + cluster
518	snapshotPath := clusterPath + "/snapshots/" + snapshot
519
520	req := &btapb.DeleteSnapshotRequest{
521		Name: snapshotPath,
522	}
523	_, err := ac.tClient.DeleteSnapshot(ctx, req)
524	return err
525}
526
527// getConsistencyToken gets the consistency token for a table.
528func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) {
529	req := &btapb.GenerateConsistencyTokenRequest{
530		Name: tableName,
531	}
532	resp, err := ac.tClient.GenerateConsistencyToken(ctx, req)
533	if err != nil {
534		return "", err
535	}
536	return resp.GetConsistencyToken(), nil
537}
538
539// isConsistent checks if a token is consistent for a table.
540func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) {
541	req := &btapb.CheckConsistencyRequest{
542		Name:             tableName,
543		ConsistencyToken: token,
544	}
545	var resp *btapb.CheckConsistencyResponse
546
547	// Retry calls on retryable errors to avoid losing the token gathered before.
548	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
549		var err error
550		resp, err = ac.tClient.CheckConsistency(ctx, req)
551		return err
552	}, retryOptions...)
553	if err != nil {
554		return false, err
555	}
556	return resp.GetConsistent(), nil
557}
558
559// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication.
560func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error {
561	ctx = mergeOutgoingMetadata(ctx, ac.md)
562	// Get the token.
563	prefix := ac.instancePrefix()
564	tableName := prefix + "/tables/" + table
565	token, err := ac.getConsistencyToken(ctx, tableName)
566	if err != nil {
567		return err
568	}
569
570	// Periodically check if the token is consistent.
571	timer := time.NewTicker(time.Second * 10)
572	defer timer.Stop()
573	for {
574		consistent, err := ac.isConsistent(ctx, tableName, token)
575		if err != nil {
576			return err
577		}
578		if consistent {
579			return nil
580		}
581		// Sleep for a bit or until the ctx is cancelled.
582		select {
583		case <-ctx.Done():
584			return ctx.Err()
585		case <-timer.C:
586		}
587	}
588}
589
590// TableIAM creates an IAM client specific to a given Instance and Table within the configured project.
591func (ac *AdminClient) TableIAM(tableID string) *iam.Handle {
592	return iam.InternalNewHandleGRPCClient(ac.tClient,
593		"projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID)
594}
595
596const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
597
598// InstanceAdminClient is a client type for performing admin operations on instances.
599// These operations can be substantially more dangerous than those provided by AdminClient.
600type InstanceAdminClient struct {
601	connPool  gtransport.ConnPool
602	iClient   btapb.BigtableInstanceAdminClient
603	lroClient *lroauto.OperationsClient
604
605	project string
606
607	// Metadata to be sent with each request.
608	md metadata.MD
609}
610
611// NewInstanceAdminClient creates a new InstanceAdminClient for a given project.
612func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) {
613	o, err := btopt.DefaultClientOptions(instanceAdminAddr, InstanceAdminScope, clientUserAgent)
614	if err != nil {
615		return nil, err
616	}
617	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
618	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
619	o = append(o, opts...)
620	connPool, err := gtransport.DialPool(ctx, o...)
621	if err != nil {
622		return nil, fmt.Errorf("dialing: %v", err)
623	}
624
625	lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool))
626	if err != nil {
627		// This error "should not happen", since we are just reusing old connection
628		// and never actually need to dial.
629		// If this does happen, we could leak conn. However, we cannot close conn:
630		// If the user invoked the function with option.WithGRPCConn,
631		// we would close a connection that's still in use.
632		// TODO(pongad): investigate error conditions.
633		return nil, err
634	}
635
636	return &InstanceAdminClient{
637		connPool:  connPool,
638		iClient:   btapb.NewBigtableInstanceAdminClient(connPool),
639		lroClient: lroClient,
640
641		project: project,
642		md:      metadata.Pairs(resourcePrefixHeader, "projects/"+project),
643	}, nil
644}
645
646// Close closes the InstanceAdminClient.
647func (iac *InstanceAdminClient) Close() error {
648	return iac.connPool.Close()
649}
650
651// StorageType is the type of storage used for all tables in an instance
652type StorageType int
653
654const (
655	SSD StorageType = iota
656	HDD
657)
658
659func (st StorageType) proto() btapb.StorageType {
660	if st == HDD {
661		return btapb.StorageType_HDD
662	}
663	return btapb.StorageType_SSD
664}
665
666func storageTypeFromProto(st btapb.StorageType) StorageType {
667	if st == btapb.StorageType_HDD {
668		return HDD
669	}
670
671	return SSD
672}
673
674// InstanceType is the type of the instance
675type InstanceType int32
676
677const (
678	// UNSPECIFIED instance types default to PRODUCTION
679	UNSPECIFIED InstanceType = InstanceType(btapb.Instance_TYPE_UNSPECIFIED)
680	PRODUCTION               = InstanceType(btapb.Instance_PRODUCTION)
681	DEVELOPMENT              = InstanceType(btapb.Instance_DEVELOPMENT)
682)
683
684// InstanceInfo represents information about an instance
685type InstanceInfo struct {
686	Name         string // name of the instance
687	DisplayName  string // display name for UIs
688	InstanceType InstanceType
689}
690
691// InstanceConf contains the information necessary to create an Instance
692type InstanceConf struct {
693	InstanceId, DisplayName, ClusterId, Zone string
694	// NumNodes must not be specified for DEVELOPMENT instance types
695	NumNodes     int32
696	StorageType  StorageType
697	InstanceType InstanceType
698}
699
700// InstanceWithClustersConfig contains the information necessary to create an Instance
701type InstanceWithClustersConfig struct {
702	InstanceID, DisplayName string
703	Clusters                []ClusterConfig
704	InstanceType            InstanceType
705}
706
707var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`)
708
709// CreateInstance creates a new instance in the project.
710// This method will return when the instance has been created or when an error occurs.
711func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
712	ctx = mergeOutgoingMetadata(ctx, iac.md)
713	newConfig := InstanceWithClustersConfig{
714		InstanceID:   conf.InstanceId,
715		DisplayName:  conf.DisplayName,
716		InstanceType: conf.InstanceType,
717		Clusters: []ClusterConfig{
718			{
719				InstanceID:  conf.InstanceId,
720				ClusterID:   conf.ClusterId,
721				Zone:        conf.Zone,
722				NumNodes:    conf.NumNodes,
723				StorageType: conf.StorageType,
724			},
725		},
726	}
727	return iac.CreateInstanceWithClusters(ctx, &newConfig)
728}
729
730// CreateInstanceWithClusters creates a new instance with configured clusters in the project.
731// This method will return when the instance has been created or when an error occurs.
732func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
733	ctx = mergeOutgoingMetadata(ctx, iac.md)
734	clusters := make(map[string]*btapb.Cluster)
735	for _, cluster := range conf.Clusters {
736		clusters[cluster.ClusterID] = cluster.proto(iac.project)
737	}
738
739	req := &btapb.CreateInstanceRequest{
740		Parent:     "projects/" + iac.project,
741		InstanceId: conf.InstanceID,
742		Instance:   &btapb.Instance{DisplayName: conf.DisplayName, Type: btapb.Instance_Type(conf.InstanceType)},
743		Clusters:   clusters,
744	}
745
746	lro, err := iac.iClient.CreateInstance(ctx, req)
747	if err != nil {
748		return err
749	}
750	resp := btapb.Instance{}
751	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
752}
753
754// updateInstance updates a single instance based on config fields that operate
755// at an instance level: DisplayName and InstanceType.
756func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *InstanceWithClustersConfig) (updated bool, err error) {
757	if conf.InstanceID == "" {
758		return false, errors.New("InstanceID is required")
759	}
760
761	// Update the instance, if necessary
762	mask := &field_mask.FieldMask{}
763	ireq := &btapb.PartialUpdateInstanceRequest{
764		Instance: &btapb.Instance{
765			Name: "projects/" + iac.project + "/instances/" + conf.InstanceID,
766		},
767		UpdateMask: mask,
768	}
769	if conf.DisplayName != "" {
770		ireq.Instance.DisplayName = conf.DisplayName
771		mask.Paths = append(mask.Paths, "display_name")
772	}
773	if btapb.Instance_Type(conf.InstanceType) != btapb.Instance_TYPE_UNSPECIFIED {
774		ireq.Instance.Type = btapb.Instance_Type(conf.InstanceType)
775		mask.Paths = append(mask.Paths, "type")
776	}
777
778	if len(mask.Paths) == 0 {
779		return false, nil
780	}
781
782	lro, err := iac.iClient.PartialUpdateInstance(ctx, ireq)
783	if err != nil {
784		return false, err
785	}
786	err = longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
787	if err != nil {
788		return false, err
789	}
790
791	return true, nil
792}
793
794// UpdateInstanceWithClusters updates an instance and its clusters. Updateable
795// fields are instance display name, instance type and cluster size.
796// The provided InstanceWithClustersConfig is used as follows:
797// - InstanceID is required
798// - DisplayName and InstanceType are updated only if they are not empty
799// - ClusterID is required for any provided cluster
800// - All other cluster fields are ignored except for NumNodes, which if set will be updated
801//
802// This method may return an error after partially succeeding, for example if the instance is updated
803// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
804// determine the current state.
805func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
806	ctx = mergeOutgoingMetadata(ctx, iac.md)
807
808	for _, cluster := range conf.Clusters {
809		if cluster.ClusterID == "" {
810			return errors.New("ClusterID is required for every cluster")
811		}
812	}
813
814	updatedInstance, err := iac.updateInstance(ctx, conf)
815	if err != nil {
816		return err
817	}
818
819	// Update any clusters
820	for _, cluster := range conf.Clusters {
821		err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
822		if err != nil {
823			if updatedInstance {
824				// We updated the instance, so note that in the error message.
825				return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded",
826					cluster.ClusterID, err)
827			}
828			return err
829		}
830	}
831
832	return nil
833}
834
835// DeleteInstance deletes an instance from the project.
836func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error {
837	ctx = mergeOutgoingMetadata(ctx, iac.md)
838	req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID}
839	_, err := iac.iClient.DeleteInstance(ctx, req)
840	return err
841}
842
843// Instances returns a list of instances in the project.
844func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) {
845	ctx = mergeOutgoingMetadata(ctx, iac.md)
846	req := &btapb.ListInstancesRequest{
847		Parent: "projects/" + iac.project,
848	}
849	var res *btapb.ListInstancesResponse
850	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
851		var err error
852		res, err = iac.iClient.ListInstances(ctx, req)
853		return err
854	}, retryOptions...)
855	if err != nil {
856		return nil, err
857	}
858	if len(res.FailedLocations) > 0 {
859		// We don't have a good way to return a partial result in the face of some zones being unavailable.
860		// Fail the entire request.
861		return nil, status.Errorf(codes.Unavailable, "Failed locations: %v", res.FailedLocations)
862	}
863
864	var is []*InstanceInfo
865	for _, i := range res.Instances {
866		m := instanceNameRegexp.FindStringSubmatch(i.Name)
867		if m == nil {
868			return nil, fmt.Errorf("malformed instance name %q", i.Name)
869		}
870		is = append(is, &InstanceInfo{
871			Name:         m[2],
872			DisplayName:  i.DisplayName,
873			InstanceType: InstanceType(i.Type),
874		})
875	}
876	return is, nil
877}
878
879// InstanceInfo returns information about an instance.
880func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) {
881	ctx = mergeOutgoingMetadata(ctx, iac.md)
882	req := &btapb.GetInstanceRequest{
883		Name: "projects/" + iac.project + "/instances/" + instanceID,
884	}
885	var res *btapb.Instance
886	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
887		var err error
888		res, err = iac.iClient.GetInstance(ctx, req)
889		return err
890	}, retryOptions...)
891	if err != nil {
892		return nil, err
893	}
894
895	m := instanceNameRegexp.FindStringSubmatch(res.Name)
896	if m == nil {
897		return nil, fmt.Errorf("malformed instance name %q", res.Name)
898	}
899	return &InstanceInfo{
900		Name:         m[2],
901		DisplayName:  res.DisplayName,
902		InstanceType: InstanceType(res.Type),
903	}, nil
904}
905
906// ClusterConfig contains the information necessary to create a cluster
907type ClusterConfig struct {
908	InstanceID, ClusterID, Zone string
909	NumNodes                    int32
910	StorageType                 StorageType
911}
912
913func (cc *ClusterConfig) proto(project string) *btapb.Cluster {
914	return &btapb.Cluster{
915		ServeNodes:         cc.NumNodes,
916		DefaultStorageType: cc.StorageType.proto(),
917		Location:           "projects/" + project + "/locations/" + cc.Zone,
918	}
919}
920
921// ClusterInfo represents information about a cluster.
922type ClusterInfo struct {
923	Name        string      // name of the cluster
924	Zone        string      // GCP zone of the cluster (e.g. "us-central1-a")
925	ServeNodes  int         // number of allocated serve nodes
926	State       string      // state of the cluster
927	StorageType StorageType // the storage type of the cluster
928}
929
930// CreateCluster creates a new cluster in an instance.
931// This method will return when the cluster has been created or when an error occurs.
932func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error {
933	ctx = mergeOutgoingMetadata(ctx, iac.md)
934
935	req := &btapb.CreateClusterRequest{
936		Parent:    "projects/" + iac.project + "/instances/" + conf.InstanceID,
937		ClusterId: conf.ClusterID,
938		Cluster:   conf.proto(iac.project),
939	}
940
941	lro, err := iac.iClient.CreateCluster(ctx, req)
942	if err != nil {
943		return err
944	}
945	resp := btapb.Cluster{}
946	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
947}
948
949// DeleteCluster deletes a cluster from an instance.
950func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error {
951	ctx = mergeOutgoingMetadata(ctx, iac.md)
952	req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
953	_, err := iac.iClient.DeleteCluster(ctx, req)
954	return err
955}
956
957// UpdateCluster updates attributes of a cluster
958func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error {
959	ctx = mergeOutgoingMetadata(ctx, iac.md)
960	cluster := &btapb.Cluster{
961		Name:       "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
962		ServeNodes: serveNodes}
963	lro, err := iac.iClient.UpdateCluster(ctx, cluster)
964	if err != nil {
965		return err
966	}
967	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
968}
969
970// Clusters lists the clusters in an instance.
971func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) {
972	ctx = mergeOutgoingMetadata(ctx, iac.md)
973	req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID}
974	var res *btapb.ListClustersResponse
975	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
976		var err error
977		res, err = iac.iClient.ListClusters(ctx, req)
978		return err
979	}, retryOptions...)
980	if err != nil {
981		return nil, err
982	}
983	// TODO(garyelliott): Deal with failed_locations.
984	var cis []*ClusterInfo
985	for _, c := range res.Clusters {
986		nameParts := strings.Split(c.Name, "/")
987		locParts := strings.Split(c.Location, "/")
988		cis = append(cis, &ClusterInfo{
989			Name:        nameParts[len(nameParts)-1],
990			Zone:        locParts[len(locParts)-1],
991			ServeNodes:  int(c.ServeNodes),
992			State:       c.State.String(),
993			StorageType: storageTypeFromProto(c.DefaultStorageType),
994		})
995	}
996	return cis, nil
997}
998
999// GetCluster fetches a cluster in an instance
1000func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) {
1001	ctx = mergeOutgoingMetadata(ctx, iac.md)
1002	req := &btapb.GetClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
1003	var c *btapb.Cluster
1004	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1005		var err error
1006		c, err = iac.iClient.GetCluster(ctx, req)
1007		return err
1008	}, retryOptions...)
1009	if err != nil {
1010		return nil, err
1011	}
1012
1013	nameParts := strings.Split(c.Name, "/")
1014	locParts := strings.Split(c.Location, "/")
1015	cis := &ClusterInfo{
1016		Name:        nameParts[len(nameParts)-1],
1017		Zone:        locParts[len(locParts)-1],
1018		ServeNodes:  int(c.ServeNodes),
1019		State:       c.State.String(),
1020		StorageType: storageTypeFromProto(c.DefaultStorageType),
1021	}
1022	return cis, nil
1023}
1024
1025// InstanceIAM returns the instance's IAM handle.
1026func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle {
1027	return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID)
1028}
1029
1030// Routing policies.
1031const (
1032	// MultiClusterRouting is a policy that allows read/write requests to be
1033	// routed to any cluster in the instance. Requests will will fail over to
1034	// another cluster in the event of transient errors or delays. Choosing
1035	// this option sacrifices read-your-writes consistency to improve
1036	// availability.
1037	MultiClusterRouting = "multi_cluster_routing_use_any"
1038	// SingleClusterRouting is a policy that unconditionally routes all
1039	// read/write requests to a specific cluster. This option preserves
1040	// read-your-writes consistency, but does not improve availability.
1041	SingleClusterRouting = "single_cluster_routing"
1042)
1043
1044// ProfileConf contains the information necessary to create an profile
1045type ProfileConf struct {
1046	Name                     string
1047	ProfileID                string
1048	InstanceID               string
1049	Etag                     string
1050	Description              string
1051	RoutingPolicy            string
1052	ClusterID                string
1053	AllowTransactionalWrites bool
1054
1055	// If true, warnings are ignored
1056	IgnoreWarnings bool
1057}
1058
1059// ProfileIterator iterates over profiles.
1060type ProfileIterator struct {
1061	items    []*btapb.AppProfile
1062	pageInfo *iterator.PageInfo
1063	nextFunc func() error
1064}
1065
1066// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced.
1067type ProfileAttrsToUpdate struct {
1068	// If set, updates the description.
1069	Description optional.String
1070
1071	//If set, updates the routing policy.
1072	RoutingPolicy optional.String
1073
1074	//If RoutingPolicy is updated to SingleClusterRouting, set these fields as well.
1075	ClusterID                string
1076	AllowTransactionalWrites bool
1077
1078	// If true, warnings are ignored
1079	IgnoreWarnings bool
1080}
1081
1082// GetFieldMaskPath returns the field mask path.
1083func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string {
1084	path := make([]string, 0)
1085	if p.Description != nil {
1086		path = append(path, "description")
1087	}
1088
1089	if p.RoutingPolicy != nil {
1090		path = append(path, optional.ToString(p.RoutingPolicy))
1091	}
1092	return path
1093}
1094
1095// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
1096func (it *ProfileIterator) PageInfo() *iterator.PageInfo {
1097	return it.pageInfo
1098}
1099
1100// Next returns the next result. Its second return value is iterator.Done
1101// (https://godoc.org/google.golang.org/api/iterator) if there are no more
1102// results. Once Next returns Done, all subsequent calls will return Done.
1103func (it *ProfileIterator) Next() (*btapb.AppProfile, error) {
1104	if err := it.nextFunc(); err != nil {
1105		return nil, err
1106	}
1107	item := it.items[0]
1108	it.items = it.items[1:]
1109	return item, nil
1110}
1111
1112// CreateAppProfile creates an app profile within an instance.
1113func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) {
1114	ctx = mergeOutgoingMetadata(ctx, iac.md)
1115	parent := "projects/" + iac.project + "/instances/" + profile.InstanceID
1116	appProfile := &btapb.AppProfile{
1117		Etag:        profile.Etag,
1118		Description: profile.Description,
1119	}
1120
1121	if profile.RoutingPolicy == "" {
1122		return nil, errors.New("invalid routing policy")
1123	}
1124
1125	switch profile.RoutingPolicy {
1126	case MultiClusterRouting:
1127		appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1128			MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1129		}
1130	case SingleClusterRouting:
1131		appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1132			SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1133				ClusterId:                profile.ClusterID,
1134				AllowTransactionalWrites: profile.AllowTransactionalWrites,
1135			},
1136		}
1137	default:
1138		return nil, errors.New("invalid routing policy")
1139	}
1140
1141	return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{
1142		Parent:         parent,
1143		AppProfile:     appProfile,
1144		AppProfileId:   profile.ProfileID,
1145		IgnoreWarnings: profile.IgnoreWarnings,
1146	})
1147}
1148
1149// GetAppProfile gets information about an app profile.
1150func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) {
1151	ctx = mergeOutgoingMetadata(ctx, iac.md)
1152	profileRequest := &btapb.GetAppProfileRequest{
1153		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1154	}
1155	var ap *btapb.AppProfile
1156	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1157		var err error
1158		ap, err = iac.iClient.GetAppProfile(ctx, profileRequest)
1159		return err
1160	}, retryOptions...)
1161	if err != nil {
1162		return nil, err
1163	}
1164	return ap, err
1165}
1166
1167// ListAppProfiles lists information about app profiles in an instance.
1168func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator {
1169	ctx = mergeOutgoingMetadata(ctx, iac.md)
1170	listRequest := &btapb.ListAppProfilesRequest{
1171		Parent: "projects/" + iac.project + "/instances/" + instanceID,
1172	}
1173
1174	pit := &ProfileIterator{}
1175	fetch := func(pageSize int, pageToken string) (string, error) {
1176		listRequest.PageToken = pageToken
1177		var profileRes *btapb.ListAppProfilesResponse
1178		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1179			var err error
1180			profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest)
1181			return err
1182		}, retryOptions...)
1183		if err != nil {
1184			return "", err
1185		}
1186
1187		pit.items = append(pit.items, profileRes.AppProfiles...)
1188		return profileRes.NextPageToken, nil
1189	}
1190
1191	bufLen := func() int { return len(pit.items) }
1192	takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b }
1193	pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
1194	return pit
1195
1196}
1197
1198// UpdateAppProfile updates an app profile within an instance.
1199// updateAttrs should be set. If unset, all fields will be replaced.
1200func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error {
1201	ctx = mergeOutgoingMetadata(ctx, iac.md)
1202
1203	profile := &btapb.AppProfile{
1204		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID,
1205	}
1206
1207	if updateAttrs.Description != nil {
1208		profile.Description = optional.ToString(updateAttrs.Description)
1209	}
1210	if updateAttrs.RoutingPolicy != nil {
1211		switch optional.ToString(updateAttrs.RoutingPolicy) {
1212		case MultiClusterRouting:
1213			profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1214				MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1215			}
1216		case SingleClusterRouting:
1217			profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1218				SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1219					ClusterId:                updateAttrs.ClusterID,
1220					AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites,
1221				},
1222			}
1223		default:
1224			return errors.New("invalid routing policy")
1225		}
1226	}
1227	patchRequest := &btapb.UpdateAppProfileRequest{
1228		AppProfile: profile,
1229		UpdateMask: &field_mask.FieldMask{
1230			Paths: updateAttrs.GetFieldMaskPath(),
1231		},
1232		IgnoreWarnings: updateAttrs.IgnoreWarnings,
1233	}
1234	updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest)
1235	if err != nil {
1236		return err
1237	}
1238
1239	return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil)
1240
1241}
1242
1243// DeleteAppProfile deletes an app profile from an instance.
1244func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error {
1245	ctx = mergeOutgoingMetadata(ctx, iac.md)
1246	deleteProfileRequest := &btapb.DeleteAppProfileRequest{
1247		Name:           "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1248		IgnoreWarnings: true,
1249	}
1250	_, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest)
1251	return err
1252
1253}
1254
1255// UpdateInstanceResults contains information about the
1256// changes made after invoking UpdateInstanceAndSyncClusters.
1257type UpdateInstanceResults struct {
1258	InstanceUpdated bool
1259	CreatedClusters []string
1260	DeletedClusters []string
1261	UpdatedClusters []string
1262}
1263
1264func (r *UpdateInstanceResults) String() string {
1265	return fmt.Sprintf("Instance updated? %v Clusters added:%v Clusters deleted:%v Clusters updated:%v",
1266		r.InstanceUpdated, r.CreatedClusters, r.DeletedClusters, r.UpdatedClusters)
1267}
1268
1269func max(x, y int) int {
1270	if x > y {
1271		return x
1272	}
1273	return y
1274}
1275
1276// UpdateInstanceAndSyncClusters updates an instance and its clusters, and will synchronize the
1277// clusters in the instance with the provided clusters, creating and deleting them as necessary.
1278// The provided InstanceWithClustersConfig is used as follows:
1279// - InstanceID is required
1280// - DisplayName and InstanceType are updated only if they are not empty
1281// - ClusterID is required for any provided cluster
1282// - Any cluster present in conf.Clusters but not part of the instance will be created using CreateCluster
1283//   and the given ClusterConfig.
1284// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance
1285//   using DeleteCluster.
1286// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the
1287//   provided number of nodes if set.
1288//
1289// This method may return an error after partially succeeding, for example if the instance is updated
1290// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
1291// determine the current state. The return UpdateInstanceResults will describe the work done by the
1292// method, whether partial or complete.
1293func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient, conf *InstanceWithClustersConfig) (*UpdateInstanceResults, error) {
1294	ctx = mergeOutgoingMetadata(ctx, iac.md)
1295
1296	// First fetch the existing clusters so we know what to remove, add or update.
1297	existingClusters, err := iac.Clusters(ctx, conf.InstanceID)
1298	if err != nil {
1299		return nil, err
1300	}
1301
1302	updatedInstance, err := iac.updateInstance(ctx, conf)
1303	if err != nil {
1304		return nil, err
1305	}
1306
1307	results := &UpdateInstanceResults{InstanceUpdated: updatedInstance}
1308
1309	existingClusterNames := make(map[string]bool)
1310	for _, cluster := range existingClusters {
1311		existingClusterNames[cluster.Name] = true
1312	}
1313
1314	// Synchronize clusters that were passed in with the existing clusters in the instance.
1315	// First update any cluster we encounter that already exists in the instance.
1316	// Collect the clusters that we will create and delete so that we can minimize disruption
1317	// of the instance.
1318	clustersToCreate := list.New()
1319	clustersToDelete := list.New()
1320	for _, cluster := range conf.Clusters {
1321		_, clusterExists := existingClusterNames[cluster.ClusterID]
1322		if !clusterExists {
1323			// The cluster doesn't exist yet, so we must create it.
1324			clustersToCreate.PushBack(cluster)
1325			continue
1326		}
1327		delete(existingClusterNames, cluster.ClusterID)
1328
1329		if cluster.NumNodes <= 0 {
1330			// We only synchronize clusters with a valid number of nodes.
1331			continue
1332		}
1333
1334		// We simply want to update this cluster
1335		err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
1336		if err != nil {
1337			return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v",
1338				cluster.ClusterID, err, results)
1339		}
1340		results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID)
1341	}
1342
1343	// Any cluster left in existingClusterNames was NOT in the given config and should be deleted.
1344	for clusterToDelete := range existingClusterNames {
1345		clustersToDelete.PushBack(clusterToDelete)
1346	}
1347
1348	// Now that we have the clusters that we need to create and delete, we do so keeping the following
1349	// in mind:
1350	// - Don't delete the last cluster in the instance, as that will result in an error.
1351	// - Attempt to offset each deletion with a creation before another deletion, so that instance
1352	//   capacity is never reduced more than necessary.
1353	// Note that there is a limit on number of clusters in an instance which we are not aware of here,
1354	// so delete a cluster before adding one (as long as there are > 1 clusters left) so that we are
1355	// less likely to exceed the maximum number of clusters.
1356	numExistingClusters := len(existingClusters)
1357	nextCreation := clustersToCreate.Front()
1358	nextDeletion := clustersToDelete.Front()
1359	for {
1360		// We are done when both lists are empty.
1361		if nextCreation == nil && nextDeletion == nil {
1362			break
1363		}
1364
1365		// If there is more than one existing cluster, we always want to delete first if possible.
1366		// If there are no more creations left, always go ahead with the deletion.
1367		if (numExistingClusters > 1 && nextDeletion != nil) || nextCreation == nil {
1368			clusterToDelete := nextDeletion.Value.(string)
1369			err = iac.DeleteCluster(ctx, conf.InstanceID, clusterToDelete)
1370			if err != nil {
1371				return results, fmt.Errorf("DeleteCluster %q failed %v; Progress: %v",
1372					clusterToDelete, err, results)
1373			}
1374			results.DeletedClusters = append(results.DeletedClusters, clusterToDelete)
1375			numExistingClusters--
1376			nextDeletion = nextDeletion.Next()
1377		}
1378
1379		// Now create a new cluster if required.
1380		if nextCreation != nil {
1381			clusterToCreate := nextCreation.Value.(ClusterConfig)
1382			// Assume the cluster config is well formed and rely on the underlying call to error out.
1383			// Make sure to set the InstanceID, though, since we know what it must be.
1384			clusterToCreate.InstanceID = conf.InstanceID
1385			err = iac.CreateCluster(ctx, &clusterToCreate)
1386			if err != nil {
1387				return results, fmt.Errorf("CreateCluster %v failed %v; Progress: %v",
1388					clusterToCreate, err, results)
1389			}
1390			results.CreatedClusters = append(results.CreatedClusters, clusterToCreate.ClusterID)
1391			numExistingClusters++
1392			nextCreation = nextCreation.Next()
1393		}
1394	}
1395
1396	return results, nil
1397}
1398