1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"math"
24	"regexp"
25	"strings"
26	"time"
27
28	"cloud.google.com/go/bigtable/internal/gax"
29	btopt "cloud.google.com/go/bigtable/internal/option"
30	"cloud.google.com/go/iam"
31	"cloud.google.com/go/internal/optional"
32	"cloud.google.com/go/longrunning"
33	lroauto "cloud.google.com/go/longrunning/autogen"
34	"github.com/golang/protobuf/ptypes"
35	durpb "github.com/golang/protobuf/ptypes/duration"
36	"google.golang.org/api/cloudresourcemanager/v1"
37	"google.golang.org/api/iterator"
38	"google.golang.org/api/option"
39	gtransport "google.golang.org/api/transport/grpc"
40	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
41	"google.golang.org/genproto/protobuf/field_mask"
42	"google.golang.org/grpc"
43	"google.golang.org/grpc/codes"
44	"google.golang.org/grpc/metadata"
45	"google.golang.org/grpc/status"
46)
47
48const adminAddr = "bigtableadmin.googleapis.com:443"
49
50// AdminClient is a client type for performing admin operations within a specific instance.
51type AdminClient struct {
52	conn      *grpc.ClientConn
53	tClient   btapb.BigtableTableAdminClient
54	lroClient *lroauto.OperationsClient
55
56	project, instance string
57
58	// Metadata to be sent with each request.
59	md metadata.MD
60}
61
62// NewAdminClient creates a new AdminClient for a given project and instance.
63func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) {
64	o, err := btopt.DefaultClientOptions(adminAddr, AdminScope, clientUserAgent)
65	if err != nil {
66		return nil, err
67	}
68	// Need to add scopes for long running operations (for create table & snapshots)
69	o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope))
70	o = append(o, opts...)
71	conn, err := gtransport.Dial(ctx, o...)
72	if err != nil {
73		return nil, fmt.Errorf("dialing: %v", err)
74	}
75
76	lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn))
77	if err != nil {
78		// This error "should not happen", since we are just reusing old connection
79		// and never actually need to dial.
80		// If this does happen, we could leak conn. However, we cannot close conn:
81		// If the user invoked the function with option.WithGRPCConn,
82		// we would close a connection that's still in use.
83		// TODO(pongad): investigate error conditions.
84		return nil, err
85	}
86
87	return &AdminClient{
88		conn:      conn,
89		tClient:   btapb.NewBigtableTableAdminClient(conn),
90		lroClient: lroClient,
91		project:   project,
92		instance:  instance,
93		md:        metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)),
94	}, nil
95}
96
97// Close closes the AdminClient.
98func (ac *AdminClient) Close() error {
99	return ac.conn.Close()
100}
101
102func (ac *AdminClient) instancePrefix() string {
103	return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
104}
105
106// Tables returns a list of the tables in the instance.
107func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
108	ctx = mergeOutgoingMetadata(ctx, ac.md)
109	prefix := ac.instancePrefix()
110	req := &btapb.ListTablesRequest{
111		Parent: prefix,
112	}
113
114	var res *btapb.ListTablesResponse
115	err := gax.Invoke(ctx, func(ctx context.Context) error {
116		var err error
117		res, err = ac.tClient.ListTables(ctx, req)
118		return err
119	}, retryOptions...)
120	if err != nil {
121		return nil, err
122	}
123
124	names := make([]string, 0, len(res.Tables))
125	for _, tbl := range res.Tables {
126		names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/"))
127	}
128	return names, nil
129}
130
131// TableConf contains all of the information necessary to create a table with column families.
132type TableConf struct {
133	TableID   string
134	SplitKeys []string
135	// Families is a map from family name to GCPolicy
136	Families map[string]GCPolicy
137}
138
139// CreateTable creates a new table in the instance.
140// This method may return before the table's creation is complete.
141func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
142	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table})
143}
144
145// CreatePresplitTable creates a new table in the instance.
146// The list of row keys will be used to initially split the table into multiple tablets.
147// Given two split keys, "s1" and "s2", three tablets will be created,
148// spanning the key ranges: [, s1), [s1, s2), [s2, ).
149// This method may return before the table's creation is complete.
150func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error {
151	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys})
152}
153
154// CreateTableFromConf creates a new table in the instance from the given configuration.
155func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error {
156	ctx = mergeOutgoingMetadata(ctx, ac.md)
157	var reqSplits []*btapb.CreateTableRequest_Split
158	for _, split := range conf.SplitKeys {
159		reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)})
160	}
161	var tbl btapb.Table
162	if conf.Families != nil {
163		tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
164		for fam, policy := range conf.Families {
165			tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()}
166		}
167	}
168	prefix := ac.instancePrefix()
169	req := &btapb.CreateTableRequest{
170		Parent:        prefix,
171		TableId:       conf.TableID,
172		Table:         &tbl,
173		InitialSplits: reqSplits,
174	}
175	_, err := ac.tClient.CreateTable(ctx, req)
176	return err
177}
178
179// CreateColumnFamily creates a new column family in a table.
180func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error {
181	// TODO(dsymonds): Permit specifying gcexpr and any other family settings.
182	ctx = mergeOutgoingMetadata(ctx, ac.md)
183	prefix := ac.instancePrefix()
184	req := &btapb.ModifyColumnFamiliesRequest{
185		Name: prefix + "/tables/" + table,
186		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
187			Id:  family,
188			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
189		}},
190	}
191	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
192	return err
193}
194
195// DeleteTable deletes a table and all of its data.
196func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
197	ctx = mergeOutgoingMetadata(ctx, ac.md)
198	prefix := ac.instancePrefix()
199	req := &btapb.DeleteTableRequest{
200		Name: prefix + "/tables/" + table,
201	}
202	_, err := ac.tClient.DeleteTable(ctx, req)
203	return err
204}
205
206// DeleteColumnFamily deletes a column family in a table and all of its data.
207func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error {
208	ctx = mergeOutgoingMetadata(ctx, ac.md)
209	prefix := ac.instancePrefix()
210	req := &btapb.ModifyColumnFamiliesRequest{
211		Name: prefix + "/tables/" + table,
212		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
213			Id:  family,
214			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true},
215		}},
216	}
217	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
218	return err
219}
220
221// TableInfo represents information about a table.
222type TableInfo struct {
223	// DEPRECATED - This field is deprecated. Please use FamilyInfos instead.
224	Families    []string
225	FamilyInfos []FamilyInfo
226}
227
228// FamilyInfo represents information about a column family.
229type FamilyInfo struct {
230	Name     string
231	GCPolicy string
232}
233
234// TableInfo retrieves information about a table.
235func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) {
236	ctx = mergeOutgoingMetadata(ctx, ac.md)
237	prefix := ac.instancePrefix()
238	req := &btapb.GetTableRequest{
239		Name: prefix + "/tables/" + table,
240	}
241
242	var res *btapb.Table
243
244	err := gax.Invoke(ctx, func(ctx context.Context) error {
245		var err error
246		res, err = ac.tClient.GetTable(ctx, req)
247		return err
248	}, retryOptions...)
249	if err != nil {
250		return nil, err
251	}
252
253	ti := &TableInfo{}
254	for name, fam := range res.ColumnFamilies {
255		ti.Families = append(ti.Families, name)
256		ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)})
257	}
258	return ti, nil
259}
260
261// SetGCPolicy specifies which cells in a column family should be garbage collected.
262// GC executes opportunistically in the background; table reads may return data
263// matching the GC policy.
264func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
265	ctx = mergeOutgoingMetadata(ctx, ac.md)
266	prefix := ac.instancePrefix()
267	req := &btapb.ModifyColumnFamiliesRequest{
268		Name: prefix + "/tables/" + table,
269		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
270			Id:  family,
271			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}},
272		}},
273	}
274	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
275	return err
276}
277
278// DropRowRange permanently deletes a row range from the specified table.
279func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error {
280	ctx = mergeOutgoingMetadata(ctx, ac.md)
281	prefix := ac.instancePrefix()
282	req := &btapb.DropRowRangeRequest{
283		Name:   prefix + "/tables/" + table,
284		Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)},
285	}
286	_, err := ac.tClient.DropRowRange(ctx, req)
287	return err
288}
289
290// CreateTableFromSnapshot creates a table from snapshot.
291// The table will be created in the same cluster as the snapshot.
292//
293// This is a private alpha release of Cloud Bigtable snapshots. This feature
294// is not currently available to most Cloud Bigtable customers. This feature
295// might be changed in backward-incompatible ways and is not recommended for
296// production use. It is not subject to any SLA or deprecation policy.
297func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error {
298	ctx = mergeOutgoingMetadata(ctx, ac.md)
299	prefix := ac.instancePrefix()
300	snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot
301
302	req := &btapb.CreateTableFromSnapshotRequest{
303		Parent:         prefix,
304		TableId:        table,
305		SourceSnapshot: snapshotPath,
306	}
307	op, err := ac.tClient.CreateTableFromSnapshot(ctx, req)
308	if err != nil {
309		return err
310	}
311	resp := btapb.Table{}
312	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
313}
314
315// DefaultSnapshotDuration is the default TTL for a snapshot.
316const DefaultSnapshotDuration time.Duration = 0
317
318// SnapshotTable creates a new snapshot in the specified cluster from the
319// specified source table. Setting the TTL to `DefaultSnapshotDuration` will
320// use the server side default for the duration.
321//
322// This is a private alpha release of Cloud Bigtable snapshots. This feature
323// is not currently available to most Cloud Bigtable customers. This feature
324// might be changed in backward-incompatible ways and is not recommended for
325// production use. It is not subject to any SLA or deprecation policy.
326func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error {
327	ctx = mergeOutgoingMetadata(ctx, ac.md)
328	prefix := ac.instancePrefix()
329
330	var ttlProto *durpb.Duration
331
332	if ttl > 0 {
333		ttlProto = ptypes.DurationProto(ttl)
334	}
335
336	req := &btapb.SnapshotTableRequest{
337		Name:       prefix + "/tables/" + table,
338		Cluster:    prefix + "/clusters/" + cluster,
339		SnapshotId: snapshot,
340		Ttl:        ttlProto,
341	}
342
343	op, err := ac.tClient.SnapshotTable(ctx, req)
344	if err != nil {
345		return err
346	}
347	resp := btapb.Snapshot{}
348	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
349}
350
351// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster.
352// To list snapshots across all of the clusters in the instance specify "-" as the cluster.
353//
354// This is a private alpha release of Cloud Bigtable snapshots. This feature is not
355// currently available to most Cloud Bigtable customers. This feature might be
356// changed in backward-incompatible ways and is not recommended for production use.
357// It is not subject to any SLA or deprecation policy.
358func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator {
359	ctx = mergeOutgoingMetadata(ctx, ac.md)
360	prefix := ac.instancePrefix()
361	clusterPath := prefix + "/clusters/" + cluster
362
363	it := &SnapshotIterator{}
364	req := &btapb.ListSnapshotsRequest{
365		Parent: clusterPath,
366	}
367
368	fetch := func(pageSize int, pageToken string) (string, error) {
369		req.PageToken = pageToken
370		if pageSize > math.MaxInt32 {
371			req.PageSize = math.MaxInt32
372		} else {
373			req.PageSize = int32(pageSize)
374		}
375
376		var resp *btapb.ListSnapshotsResponse
377		err := gax.Invoke(ctx, func(ctx context.Context) error {
378			var err error
379			resp, err = ac.tClient.ListSnapshots(ctx, req)
380			return err
381		}, retryOptions...)
382		if err != nil {
383			return "", err
384		}
385		for _, s := range resp.Snapshots {
386			snapshotInfo, err := newSnapshotInfo(s)
387			if err != nil {
388				return "", fmt.Errorf("failed to parse snapshot proto %v", err)
389			}
390			it.items = append(it.items, snapshotInfo)
391		}
392		return resp.NextPageToken, nil
393	}
394	bufLen := func() int { return len(it.items) }
395	takeBuf := func() interface{} { b := it.items; it.items = nil; return b }
396
397	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
398
399	return it
400}
401
402func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) {
403	nameParts := strings.Split(snapshot.Name, "/")
404	name := nameParts[len(nameParts)-1]
405	tablePathParts := strings.Split(snapshot.SourceTable.Name, "/")
406	tableID := tablePathParts[len(tablePathParts)-1]
407
408	createTime, err := ptypes.Timestamp(snapshot.CreateTime)
409	if err != nil {
410		return nil, fmt.Errorf("invalid createTime: %v", err)
411	}
412
413	deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime)
414	if err != nil {
415		return nil, fmt.Errorf("invalid deleteTime: %v", err)
416	}
417
418	return &SnapshotInfo{
419		Name:        name,
420		SourceTable: tableID,
421		DataSize:    snapshot.DataSizeBytes,
422		CreateTime:  createTime,
423		DeleteTime:  deleteTime,
424	}, nil
425}
426
427// SnapshotIterator is an EntryIterator that iterates over log entries.
428//
429// This is a private alpha release of Cloud Bigtable snapshots. This feature
430// is not currently available to most Cloud Bigtable customers. This feature
431// might be changed in backward-incompatible ways and is not recommended for
432// production use. It is not subject to any SLA or deprecation policy.
433type SnapshotIterator struct {
434	items    []*SnapshotInfo
435	pageInfo *iterator.PageInfo
436	nextFunc func() error
437}
438
439// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
440func (it *SnapshotIterator) PageInfo() *iterator.PageInfo {
441	return it.pageInfo
442}
443
444// Next returns the next result. Its second return value is iterator.Done
445// (https://godoc.org/google.golang.org/api/iterator) if there are no more
446// results. Once Next returns Done, all subsequent calls will return Done.
447func (it *SnapshotIterator) Next() (*SnapshotInfo, error) {
448	if err := it.nextFunc(); err != nil {
449		return nil, err
450	}
451	item := it.items[0]
452	it.items = it.items[1:]
453	return item, nil
454}
455
456// SnapshotInfo contains snapshot metadata.
457type SnapshotInfo struct {
458	Name        string
459	SourceTable string
460	DataSize    int64
461	CreateTime  time.Time
462	DeleteTime  time.Time
463}
464
465// SnapshotInfo gets snapshot metadata.
466//
467// This is a private alpha release of Cloud Bigtable snapshots. This feature
468// is not currently available to most Cloud Bigtable customers. This feature
469// might be changed in backward-incompatible ways and is not recommended for
470// production use. It is not subject to any SLA or deprecation policy.
471func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) {
472	ctx = mergeOutgoingMetadata(ctx, ac.md)
473	prefix := ac.instancePrefix()
474	clusterPath := prefix + "/clusters/" + cluster
475	snapshotPath := clusterPath + "/snapshots/" + snapshot
476
477	req := &btapb.GetSnapshotRequest{
478		Name: snapshotPath,
479	}
480
481	var resp *btapb.Snapshot
482	err := gax.Invoke(ctx, func(ctx context.Context) error {
483		var err error
484		resp, err = ac.tClient.GetSnapshot(ctx, req)
485		return err
486	}, retryOptions...)
487	if err != nil {
488		return nil, err
489	}
490
491	return newSnapshotInfo(resp)
492}
493
494// DeleteSnapshot deletes a snapshot in a cluster.
495//
496// This is a private alpha release of Cloud Bigtable snapshots. This feature
497// is not currently available to most Cloud Bigtable customers. This feature
498// might be changed in backward-incompatible ways and is not recommended for
499// production use. It is not subject to any SLA or deprecation policy.
500func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error {
501	ctx = mergeOutgoingMetadata(ctx, ac.md)
502	prefix := ac.instancePrefix()
503	clusterPath := prefix + "/clusters/" + cluster
504	snapshotPath := clusterPath + "/snapshots/" + snapshot
505
506	req := &btapb.DeleteSnapshotRequest{
507		Name: snapshotPath,
508	}
509	_, err := ac.tClient.DeleteSnapshot(ctx, req)
510	return err
511}
512
513// getConsistencyToken gets the consistency token for a table.
514func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) {
515	req := &btapb.GenerateConsistencyTokenRequest{
516		Name: tableName,
517	}
518	resp, err := ac.tClient.GenerateConsistencyToken(ctx, req)
519	if err != nil {
520		return "", err
521	}
522	return resp.GetConsistencyToken(), nil
523}
524
525// isConsistent checks if a token is consistent for a table.
526func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) {
527	req := &btapb.CheckConsistencyRequest{
528		Name:             tableName,
529		ConsistencyToken: token,
530	}
531	var resp *btapb.CheckConsistencyResponse
532
533	// Retry calls on retryable errors to avoid losing the token gathered before.
534	err := gax.Invoke(ctx, func(ctx context.Context) error {
535		var err error
536		resp, err = ac.tClient.CheckConsistency(ctx, req)
537		return err
538	}, retryOptions...)
539	if err != nil {
540		return false, err
541	}
542	return resp.GetConsistent(), nil
543}
544
545// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication.
546func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error {
547	// Get the token.
548	prefix := ac.instancePrefix()
549	tableName := prefix + "/tables/" + table
550	token, err := ac.getConsistencyToken(ctx, tableName)
551	if err != nil {
552		return err
553	}
554
555	// Periodically check if the token is consistent.
556	timer := time.NewTicker(time.Second * 10)
557	defer timer.Stop()
558	for {
559		consistent, err := ac.isConsistent(ctx, tableName, token)
560		if err != nil {
561			return err
562		}
563		if consistent {
564			return nil
565		}
566		// Sleep for a bit or until the ctx is cancelled.
567		select {
568		case <-ctx.Done():
569			return ctx.Err()
570		case <-timer.C:
571		}
572	}
573}
574
575const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
576
577// InstanceAdminClient is a client type for performing admin operations on instances.
578// These operations can be substantially more dangerous than those provided by AdminClient.
579type InstanceAdminClient struct {
580	conn      *grpc.ClientConn
581	iClient   btapb.BigtableInstanceAdminClient
582	lroClient *lroauto.OperationsClient
583
584	project string
585
586	// Metadata to be sent with each request.
587	md metadata.MD
588}
589
590// NewInstanceAdminClient creates a new InstanceAdminClient for a given project.
591func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) {
592	o, err := btopt.DefaultClientOptions(instanceAdminAddr, InstanceAdminScope, clientUserAgent)
593	if err != nil {
594		return nil, err
595	}
596	o = append(o, opts...)
597	conn, err := gtransport.Dial(ctx, o...)
598	if err != nil {
599		return nil, fmt.Errorf("dialing: %v", err)
600	}
601
602	lroClient, err := lroauto.NewOperationsClient(ctx, option.WithGRPCConn(conn))
603	if err != nil {
604		// This error "should not happen", since we are just reusing old connection
605		// and never actually need to dial.
606		// If this does happen, we could leak conn. However, we cannot close conn:
607		// If the user invoked the function with option.WithGRPCConn,
608		// we would close a connection that's still in use.
609		// TODO(pongad): investigate error conditions.
610		return nil, err
611	}
612
613	return &InstanceAdminClient{
614		conn:      conn,
615		iClient:   btapb.NewBigtableInstanceAdminClient(conn),
616		lroClient: lroClient,
617
618		project: project,
619		md:      metadata.Pairs(resourcePrefixHeader, "projects/"+project),
620	}, nil
621}
622
623// Close closes the InstanceAdminClient.
624func (iac *InstanceAdminClient) Close() error {
625	return iac.conn.Close()
626}
627
628// StorageType is the type of storage used for all tables in an instance
629type StorageType int
630
631const (
632	SSD StorageType = iota
633	HDD
634)
635
636func (st StorageType) proto() btapb.StorageType {
637	if st == HDD {
638		return btapb.StorageType_HDD
639	}
640	return btapb.StorageType_SSD
641}
642
643// InstanceType is the type of the instance
644type InstanceType int32
645
646const (
647	PRODUCTION  InstanceType = InstanceType(btapb.Instance_PRODUCTION)
648	DEVELOPMENT              = InstanceType(btapb.Instance_DEVELOPMENT)
649)
650
651// InstanceInfo represents information about an instance
652type InstanceInfo struct {
653	Name        string // name of the instance
654	DisplayName string // display name for UIs
655}
656
657// InstanceConf contains the information necessary to create an Instance
658type InstanceConf struct {
659	InstanceId, DisplayName, ClusterId, Zone string
660	// NumNodes must not be specified for DEVELOPMENT instance types
661	NumNodes     int32
662	StorageType  StorageType
663	InstanceType InstanceType
664}
665
666// InstanceWithClustersConfig contains the information necessary to create an Instance
667type InstanceWithClustersConfig struct {
668	InstanceID, DisplayName string
669	Clusters                []ClusterConfig
670	InstanceType            InstanceType
671}
672
673var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`)
674
675// CreateInstance creates a new instance in the project.
676// This method will return when the instance has been created or when an error occurs.
677func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
678	newConfig := InstanceWithClustersConfig{
679		InstanceID:   conf.InstanceId,
680		DisplayName:  conf.DisplayName,
681		InstanceType: conf.InstanceType,
682		Clusters: []ClusterConfig{
683			{
684				InstanceID:  conf.InstanceId,
685				ClusterID:   conf.ClusterId,
686				Zone:        conf.Zone,
687				NumNodes:    conf.NumNodes,
688				StorageType: conf.StorageType,
689			},
690		},
691	}
692	return iac.CreateInstanceWithClusters(ctx, &newConfig)
693}
694
695// CreateInstanceWithClusters creates a new instance with configured clusters in the project.
696// This method will return when the instance has been created or when an error occurs.
697func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
698	ctx = mergeOutgoingMetadata(ctx, iac.md)
699	clusters := make(map[string]*btapb.Cluster)
700	for _, cluster := range conf.Clusters {
701		clusters[cluster.ClusterID] = cluster.proto(iac.project)
702	}
703
704	req := &btapb.CreateInstanceRequest{
705		Parent:     "projects/" + iac.project,
706		InstanceId: conf.InstanceID,
707		Instance:   &btapb.Instance{DisplayName: conf.DisplayName, Type: btapb.Instance_Type(conf.InstanceType)},
708		Clusters:   clusters,
709	}
710
711	lro, err := iac.iClient.CreateInstance(ctx, req)
712	if err != nil {
713		return err
714	}
715	resp := btapb.Instance{}
716	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
717}
718
719// DeleteInstance deletes an instance from the project.
720func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error {
721	ctx = mergeOutgoingMetadata(ctx, iac.md)
722	req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID}
723	_, err := iac.iClient.DeleteInstance(ctx, req)
724	return err
725}
726
727// Instances returns a list of instances in the project.
728func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) {
729	ctx = mergeOutgoingMetadata(ctx, iac.md)
730	req := &btapb.ListInstancesRequest{
731		Parent: "projects/" + iac.project,
732	}
733	var res *btapb.ListInstancesResponse
734	err := gax.Invoke(ctx, func(ctx context.Context) error {
735		var err error
736		res, err = iac.iClient.ListInstances(ctx, req)
737		return err
738	}, retryOptions...)
739	if err != nil {
740		return nil, err
741	}
742	if len(res.FailedLocations) > 0 {
743		// We don't have a good way to return a partial result in the face of some zones being unavailable.
744		// Fail the entire request.
745		return nil, status.Errorf(codes.Unavailable, "Failed locations: %v", res.FailedLocations)
746	}
747
748	var is []*InstanceInfo
749	for _, i := range res.Instances {
750		m := instanceNameRegexp.FindStringSubmatch(i.Name)
751		if m == nil {
752			return nil, fmt.Errorf("malformed instance name %q", i.Name)
753		}
754		is = append(is, &InstanceInfo{
755			Name:        m[2],
756			DisplayName: i.DisplayName,
757		})
758	}
759	return is, nil
760}
761
762// InstanceInfo returns information about an instance.
763func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) {
764	ctx = mergeOutgoingMetadata(ctx, iac.md)
765	req := &btapb.GetInstanceRequest{
766		Name: "projects/" + iac.project + "/instances/" + instanceID,
767	}
768	var res *btapb.Instance
769	err := gax.Invoke(ctx, func(ctx context.Context) error {
770		var err error
771		res, err = iac.iClient.GetInstance(ctx, req)
772		return err
773	}, retryOptions...)
774	if err != nil {
775		return nil, err
776	}
777
778	m := instanceNameRegexp.FindStringSubmatch(res.Name)
779	if m == nil {
780		return nil, fmt.Errorf("malformed instance name %q", res.Name)
781	}
782	return &InstanceInfo{
783		Name:        m[2],
784		DisplayName: res.DisplayName,
785	}, nil
786}
787
788// ClusterConfig contains the information necessary to create a cluster
789type ClusterConfig struct {
790	InstanceID, ClusterID, Zone string
791	NumNodes                    int32
792	StorageType                 StorageType
793}
794
795func (cc *ClusterConfig) proto(project string) *btapb.Cluster {
796	return &btapb.Cluster{
797		ServeNodes:         cc.NumNodes,
798		DefaultStorageType: cc.StorageType.proto(),
799		Location:           "projects/" + project + "/locations/" + cc.Zone,
800	}
801}
802
803// ClusterInfo represents information about a cluster.
804type ClusterInfo struct {
805	Name       string // name of the cluster
806	Zone       string // GCP zone of the cluster (e.g. "us-central1-a")
807	ServeNodes int    // number of allocated serve nodes
808	State      string // state of the cluster
809}
810
811// CreateCluster creates a new cluster in an instance.
812// This method will return when the cluster has been created or when an error occurs.
813func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error {
814	ctx = mergeOutgoingMetadata(ctx, iac.md)
815
816	req := &btapb.CreateClusterRequest{
817		Parent:    "projects/" + iac.project + "/instances/" + conf.InstanceID,
818		ClusterId: conf.ClusterID,
819		Cluster:   conf.proto(iac.project),
820	}
821
822	lro, err := iac.iClient.CreateCluster(ctx, req)
823	if err != nil {
824		return err
825	}
826	resp := btapb.Cluster{}
827	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
828}
829
830// DeleteCluster deletes a cluster from an instance.
831func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error {
832	ctx = mergeOutgoingMetadata(ctx, iac.md)
833	req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
834	_, err := iac.iClient.DeleteCluster(ctx, req)
835	return err
836}
837
838// UpdateCluster updates attributes of a cluster
839func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error {
840	ctx = mergeOutgoingMetadata(ctx, iac.md)
841	cluster := &btapb.Cluster{
842		Name:       "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
843		ServeNodes: serveNodes}
844	lro, err := iac.iClient.UpdateCluster(ctx, cluster)
845	if err != nil {
846		return err
847	}
848	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
849}
850
851// Clusters lists the clusters in an instance.
852func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) {
853	ctx = mergeOutgoingMetadata(ctx, iac.md)
854	req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID}
855	var res *btapb.ListClustersResponse
856	err := gax.Invoke(ctx, func(ctx context.Context) error {
857		var err error
858		res, err = iac.iClient.ListClusters(ctx, req)
859		return err
860	}, retryOptions...)
861	if err != nil {
862		return nil, err
863	}
864	// TODO(garyelliott): Deal with failed_locations.
865	var cis []*ClusterInfo
866	for _, c := range res.Clusters {
867		nameParts := strings.Split(c.Name, "/")
868		locParts := strings.Split(c.Location, "/")
869		cis = append(cis, &ClusterInfo{
870			Name:       nameParts[len(nameParts)-1],
871			Zone:       locParts[len(locParts)-1],
872			ServeNodes: int(c.ServeNodes),
873			State:      c.State.String(),
874		})
875	}
876	return cis, nil
877}
878
879// GetCluster fetches a cluster in an instance
880func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) {
881	ctx = mergeOutgoingMetadata(ctx, iac.md)
882	req := &btapb.GetClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
883	var c *btapb.Cluster
884	err := gax.Invoke(ctx, func(ctx context.Context) error {
885		var err error
886		c, err = iac.iClient.GetCluster(ctx, req)
887		return err
888	}, retryOptions...)
889	if err != nil {
890		return nil, err
891	}
892
893	nameParts := strings.Split(c.Name, "/")
894	locParts := strings.Split(c.Location, "/")
895	cis := &ClusterInfo{
896		Name:       nameParts[len(nameParts)-1],
897		Zone:       locParts[len(locParts)-1],
898		ServeNodes: int(c.ServeNodes),
899		State:      c.State.String(),
900	}
901	return cis, nil
902}
903
904// InstanceIAM returns the instance's IAM handle.
905func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle {
906	return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID)
907
908}
909
910// Routing policies.
911const (
912	// MultiClusterRouting is a policy that allows read/write requests to be
913	// routed to any cluster in the instance. Requests will will fail over to
914	// another cluster in the event of transient errors or delays. Choosing
915	// this option sacrifices read-your-writes consistency to improve
916	// availability.
917	MultiClusterRouting = "multi_cluster_routing_use_any"
918	// SingleClusterRouting is a policy that unconditionally routes all
919	// read/write requests to a specific cluster. This option preserves
920	// read-your-writes consistency, but does not improve availability.
921	SingleClusterRouting = "single_cluster_routing"
922)
923
924// ProfileConf contains the information necessary to create an profile
925type ProfileConf struct {
926	Name                     string
927	ProfileID                string
928	InstanceID               string
929	Etag                     string
930	Description              string
931	RoutingPolicy            string
932	ClusterID                string
933	AllowTransactionalWrites bool
934
935	// If true, warnings are ignored
936	IgnoreWarnings bool
937}
938
939// ProfileIterator iterates over profiles.
940type ProfileIterator struct {
941	items    []*btapb.AppProfile
942	pageInfo *iterator.PageInfo
943	nextFunc func() error
944}
945
946// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced.
947type ProfileAttrsToUpdate struct {
948	// If set, updates the description.
949	Description optional.String
950
951	//If set, updates the routing policy.
952	RoutingPolicy optional.String
953
954	//If RoutingPolicy is updated to SingleClusterRouting, set these fields as well.
955	ClusterID                string
956	AllowTransactionalWrites bool
957
958	// If true, warnings are ignored
959	IgnoreWarnings bool
960}
961
962// GetFieldMaskPath returns the field mask path.
963func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string {
964	path := make([]string, 0)
965	if p.Description != nil {
966		path = append(path, "description")
967	}
968
969	if p.RoutingPolicy != nil {
970		path = append(path, optional.ToString(p.RoutingPolicy))
971	}
972	return path
973}
974
975// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
976func (it *ProfileIterator) PageInfo() *iterator.PageInfo {
977	return it.pageInfo
978}
979
980// Next returns the next result. Its second return value is iterator.Done
981// (https://godoc.org/google.golang.org/api/iterator) if there are no more
982// results. Once Next returns Done, all subsequent calls will return Done.
983func (it *ProfileIterator) Next() (*btapb.AppProfile, error) {
984	if err := it.nextFunc(); err != nil {
985		return nil, err
986	}
987	item := it.items[0]
988	it.items = it.items[1:]
989	return item, nil
990}
991
992// CreateAppProfile creates an app profile within an instance.
993func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) {
994	ctx = mergeOutgoingMetadata(ctx, iac.md)
995	parent := "projects/" + iac.project + "/instances/" + profile.InstanceID
996	appProfile := &btapb.AppProfile{
997		Etag:        profile.Etag,
998		Description: profile.Description,
999	}
1000
1001	if profile.RoutingPolicy == "" {
1002		return nil, errors.New("invalid routing policy")
1003	}
1004
1005	switch profile.RoutingPolicy {
1006	case MultiClusterRouting:
1007		appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1008			MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1009		}
1010	case SingleClusterRouting:
1011		appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1012			SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1013				ClusterId:                profile.ClusterID,
1014				AllowTransactionalWrites: profile.AllowTransactionalWrites,
1015			},
1016		}
1017	default:
1018		return nil, errors.New("invalid routing policy")
1019	}
1020
1021	return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{
1022		Parent:         parent,
1023		AppProfile:     appProfile,
1024		AppProfileId:   profile.ProfileID,
1025		IgnoreWarnings: profile.IgnoreWarnings,
1026	})
1027}
1028
1029// GetAppProfile gets information about an app profile.
1030func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) {
1031	ctx = mergeOutgoingMetadata(ctx, iac.md)
1032	profileRequest := &btapb.GetAppProfileRequest{
1033		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1034	}
1035	var ap *btapb.AppProfile
1036	err := gax.Invoke(ctx, func(ctx context.Context) error {
1037		var err error
1038		ap, err = iac.iClient.GetAppProfile(ctx, profileRequest)
1039		return err
1040	}, retryOptions...)
1041	if err != nil {
1042		return nil, err
1043	}
1044	return ap, err
1045}
1046
1047// ListAppProfiles lists information about app profiles in an instance.
1048func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator {
1049	ctx = mergeOutgoingMetadata(ctx, iac.md)
1050	listRequest := &btapb.ListAppProfilesRequest{
1051		Parent: "projects/" + iac.project + "/instances/" + instanceID,
1052	}
1053
1054	pit := &ProfileIterator{}
1055	fetch := func(pageSize int, pageToken string) (string, error) {
1056		listRequest.PageToken = pageToken
1057		var profileRes *btapb.ListAppProfilesResponse
1058		err := gax.Invoke(ctx, func(ctx context.Context) error {
1059			var err error
1060			profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest)
1061			return err
1062		}, retryOptions...)
1063		if err != nil {
1064			return "", err
1065		}
1066
1067		pit.items = append(pit.items, profileRes.AppProfiles...)
1068		return profileRes.NextPageToken, nil
1069	}
1070
1071	bufLen := func() int { return len(pit.items) }
1072	takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b }
1073	pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
1074	return pit
1075
1076}
1077
1078// UpdateAppProfile updates an app profile within an instance.
1079// updateAttrs should be set. If unset, all fields will be replaced.
1080func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error {
1081	ctx = mergeOutgoingMetadata(ctx, iac.md)
1082
1083	profile := &btapb.AppProfile{
1084		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID,
1085	}
1086
1087	if updateAttrs.Description != nil {
1088		profile.Description = optional.ToString(updateAttrs.Description)
1089	}
1090	if updateAttrs.RoutingPolicy != nil {
1091		switch optional.ToString(updateAttrs.RoutingPolicy) {
1092		case MultiClusterRouting:
1093			profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1094				MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1095			}
1096		case SingleClusterRouting:
1097			profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1098				SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1099					ClusterId:                updateAttrs.ClusterID,
1100					AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites,
1101				},
1102			}
1103		default:
1104			return errors.New("invalid routing policy")
1105		}
1106	}
1107	patchRequest := &btapb.UpdateAppProfileRequest{
1108		AppProfile: profile,
1109		UpdateMask: &field_mask.FieldMask{
1110			Paths: updateAttrs.GetFieldMaskPath(),
1111		},
1112		IgnoreWarnings: updateAttrs.IgnoreWarnings,
1113	}
1114	updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest)
1115	if err != nil {
1116		return err
1117	}
1118
1119	return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil)
1120
1121}
1122
1123// DeleteAppProfile deletes an app profile from an instance.
1124func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error {
1125	ctx = mergeOutgoingMetadata(ctx, iac.md)
1126	deleteProfileRequest := &btapb.DeleteAppProfileRequest{
1127		Name:           "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1128		IgnoreWarnings: true,
1129	}
1130	_, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest)
1131	return err
1132
1133}
1134