1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"container/list"
21	"context"
22	"errors"
23	"fmt"
24	"math"
25	"regexp"
26	"strings"
27	"time"
28
29	btopt "cloud.google.com/go/bigtable/internal/option"
30	"cloud.google.com/go/iam"
31	"cloud.google.com/go/internal/optional"
32	"cloud.google.com/go/longrunning"
33	lroauto "cloud.google.com/go/longrunning/autogen"
34	"github.com/golang/protobuf/ptypes"
35	durpb "github.com/golang/protobuf/ptypes/duration"
36	gax "github.com/googleapis/gax-go/v2"
37	"google.golang.org/api/cloudresourcemanager/v1"
38	"google.golang.org/api/iterator"
39	"google.golang.org/api/option"
40	gtransport "google.golang.org/api/transport/grpc"
41	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
42	"google.golang.org/genproto/googleapis/rpc/status"
43	"google.golang.org/genproto/protobuf/field_mask"
44	"google.golang.org/grpc/metadata"
45)
46
47const adminAddr = "bigtableadmin.googleapis.com:443"
48const mtlsAdminAddr = "bigtableadmin.mtls.googleapis.com:443"
49
50// ErrPartiallyUnavailable is returned when some locations (clusters) are
51// unavailable. Both partial results (retrieved from available locations)
52// and the error are returned when this exception occurred.
53type ErrPartiallyUnavailable struct {
54	Locations []string // unavailable locations
55}
56
57func (e ErrPartiallyUnavailable) Error() string {
58	return fmt.Sprintf("Unavailable locations: %v", e.Locations)
59}
60
61// AdminClient is a client type for performing admin operations within a specific instance.
62type AdminClient struct {
63	connPool  gtransport.ConnPool
64	tClient   btapb.BigtableTableAdminClient
65	lroClient *lroauto.OperationsClient
66
67	project, instance string
68
69	// Metadata to be sent with each request.
70	md metadata.MD
71}
72
73// NewAdminClient creates a new AdminClient for a given project and instance.
74func NewAdminClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*AdminClient, error) {
75	o, err := btopt.DefaultClientOptions(adminAddr, mtlsAdminAddr, AdminScope, clientUserAgent)
76	if err != nil {
77		return nil, err
78	}
79	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
80	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
81	// Need to add scopes for long running operations (for create table & snapshots)
82	o = append(o, option.WithScopes(cloudresourcemanager.CloudPlatformScope))
83	o = append(o, opts...)
84	connPool, err := gtransport.DialPool(ctx, o...)
85	if err != nil {
86		return nil, fmt.Errorf("dialing: %v", err)
87	}
88
89	lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool))
90	if err != nil {
91		// This error "should not happen", since we are just reusing old connection
92		// and never actually need to dial.
93		// If this does happen, we could leak conn. However, we cannot close conn:
94		// If the user invoked the function with option.WithGRPCConn,
95		// we would close a connection that's still in use.
96		// TODO(pongad): investigate error conditions.
97		return nil, err
98	}
99
100	return &AdminClient{
101		connPool:  connPool,
102		tClient:   btapb.NewBigtableTableAdminClient(connPool),
103		lroClient: lroClient,
104		project:   project,
105		instance:  instance,
106		md:        metadata.Pairs(resourcePrefixHeader, fmt.Sprintf("projects/%s/instances/%s", project, instance)),
107	}, nil
108}
109
110// Close closes the AdminClient.
111func (ac *AdminClient) Close() error {
112	return ac.connPool.Close()
113}
114
115func (ac *AdminClient) instancePrefix() string {
116	return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
117}
118
119func (ac *AdminClient) backupPath(cluster, instance, backup string) string {
120	return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup)
121}
122
123// EncryptionInfo represents the encryption info of a table.
124type EncryptionInfo struct {
125	Status        *Status
126	Type          EncryptionType
127	KMSKeyVersion string
128}
129
130func newEncryptionInfo(pbInfo *btapb.EncryptionInfo) *EncryptionInfo {
131	return &EncryptionInfo{
132		Status:        pbInfo.EncryptionStatus,
133		Type:          EncryptionType(pbInfo.EncryptionType.Number()),
134		KMSKeyVersion: pbInfo.KmsKeyVersion,
135	}
136}
137
138// Status references google.golang.org/grpc/status.
139// It represents an RPC status code, message, and details of EncryptionInfo.
140// https://pkg.go.dev/google.golang.org/grpc/internal/status
141type Status = status.Status
142
143// EncryptionType is the type of encryption for an instance.
144type EncryptionType int32
145
146const (
147	// EncryptionTypeUnspecified is the type was not specified, though data at rest remains encrypted.
148	EncryptionTypeUnspecified EncryptionType = iota
149	// GoogleDefaultEncryption represents that data backing this resource is
150	// encrypted at rest with a key that is fully managed by Google. No key
151	// version or status will be populated. This is the default state.
152	GoogleDefaultEncryption
153	// CustomerManagedEncryption represents that data backing this resource is
154	// encrypted at rest with a key that is managed by the customer.
155	// The in-use version of the key and its status are populated for
156	// CMEK-protected tables.
157	// CMEK-protected backups are pinned to the key version that was in use at
158	// the time the backup was taken. This key version is populated but its
159	// status is not tracked and is reported as `UNKNOWN`.
160	CustomerManagedEncryption
161)
162
163// EncryptionInfoByCluster is a map of cluster name to EncryptionInfo
164type EncryptionInfoByCluster map[string][]*EncryptionInfo
165
166// EncryptionInfo gets the current encryption info for the table across all of the clusters.
167// The returned map will be keyed by cluster id and contain a status for all of the keys in use.
168func (ac *AdminClient) EncryptionInfo(ctx context.Context, table string) (EncryptionInfoByCluster, error) {
169	ctx = mergeOutgoingMetadata(ctx, ac.md)
170
171	res, err := ac.getTable(ctx, table, btapb.Table_ENCRYPTION_VIEW)
172	if err != nil {
173		return nil, err
174	}
175	encryptionInfo := EncryptionInfoByCluster{}
176	for key, cs := range res.ClusterStates {
177		for _, pbInfo := range cs.EncryptionInfo {
178			info := EncryptionInfo{}
179			info.Status = pbInfo.EncryptionStatus
180			info.Type = EncryptionType(pbInfo.EncryptionType.Number())
181			info.KMSKeyVersion = pbInfo.KmsKeyVersion
182			encryptionInfo[key] = append(encryptionInfo[key], &info)
183		}
184	}
185
186	return encryptionInfo, nil
187}
188
189// Tables returns a list of the tables in the instance.
190func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
191	ctx = mergeOutgoingMetadata(ctx, ac.md)
192	prefix := ac.instancePrefix()
193	req := &btapb.ListTablesRequest{
194		Parent: prefix,
195	}
196
197	var res *btapb.ListTablesResponse
198	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
199		var err error
200		res, err = ac.tClient.ListTables(ctx, req)
201		return err
202	}, retryOptions...)
203	if err != nil {
204		return nil, err
205	}
206
207	names := make([]string, 0, len(res.Tables))
208	for _, tbl := range res.Tables {
209		names = append(names, strings.TrimPrefix(tbl.Name, prefix+"/tables/"))
210	}
211	return names, nil
212}
213
214// TableConf contains all of the information necessary to create a table with column families.
215type TableConf struct {
216	TableID   string
217	SplitKeys []string
218	// Families is a map from family name to GCPolicy
219	Families map[string]GCPolicy
220}
221
222// CreateTable creates a new table in the instance.
223// This method may return before the table's creation is complete.
224func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
225	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table})
226}
227
228// CreatePresplitTable creates a new table in the instance.
229// The list of row keys will be used to initially split the table into multiple tablets.
230// Given two split keys, "s1" and "s2", three tablets will be created,
231// spanning the key ranges: [, s1), [s1, s2), [s2, ).
232// This method may return before the table's creation is complete.
233func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error {
234	return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys})
235}
236
237// CreateTableFromConf creates a new table in the instance from the given configuration.
238func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) error {
239	ctx = mergeOutgoingMetadata(ctx, ac.md)
240	var reqSplits []*btapb.CreateTableRequest_Split
241	for _, split := range conf.SplitKeys {
242		reqSplits = append(reqSplits, &btapb.CreateTableRequest_Split{Key: []byte(split)})
243	}
244	var tbl btapb.Table
245	if conf.Families != nil {
246		tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
247		for fam, policy := range conf.Families {
248			tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()}
249		}
250	}
251	prefix := ac.instancePrefix()
252	req := &btapb.CreateTableRequest{
253		Parent:        prefix,
254		TableId:       conf.TableID,
255		Table:         &tbl,
256		InitialSplits: reqSplits,
257	}
258	_, err := ac.tClient.CreateTable(ctx, req)
259	return err
260}
261
262// CreateColumnFamily creates a new column family in a table.
263func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error {
264	// TODO(dsymonds): Permit specifying gcexpr and any other family settings.
265	ctx = mergeOutgoingMetadata(ctx, ac.md)
266	prefix := ac.instancePrefix()
267	req := &btapb.ModifyColumnFamiliesRequest{
268		Name: prefix + "/tables/" + table,
269		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
270			Id:  family,
271			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
272		}},
273	}
274	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
275	return err
276}
277
278// DeleteTable deletes a table and all of its data.
279func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
280	ctx = mergeOutgoingMetadata(ctx, ac.md)
281	prefix := ac.instancePrefix()
282	req := &btapb.DeleteTableRequest{
283		Name: prefix + "/tables/" + table,
284	}
285	_, err := ac.tClient.DeleteTable(ctx, req)
286	return err
287}
288
289// DeleteColumnFamily deletes a column family in a table and all of its data.
290func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error {
291	ctx = mergeOutgoingMetadata(ctx, ac.md)
292	prefix := ac.instancePrefix()
293	req := &btapb.ModifyColumnFamiliesRequest{
294		Name: prefix + "/tables/" + table,
295		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
296			Id:  family,
297			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true},
298		}},
299	}
300	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
301	return err
302}
303
304// TableInfo represents information about a table.
305type TableInfo struct {
306	// DEPRECATED - This field is deprecated. Please use FamilyInfos instead.
307	Families    []string
308	FamilyInfos []FamilyInfo
309}
310
311// FamilyInfo represents information about a column family.
312type FamilyInfo struct {
313	Name     string
314	GCPolicy string
315}
316
317func (ac *AdminClient) getTable(ctx context.Context, table string, view btapb.Table_View) (*btapb.Table, error) {
318	ctx = mergeOutgoingMetadata(ctx, ac.md)
319	prefix := ac.instancePrefix()
320	req := &btapb.GetTableRequest{
321		Name: prefix + "/tables/" + table,
322		View: view,
323	}
324
325	var res *btapb.Table
326
327	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
328		var err error
329		res, err = ac.tClient.GetTable(ctx, req)
330		return err
331	}, retryOptions...)
332	if err != nil {
333		return nil, err
334	}
335	return res, nil
336}
337
338// TableInfo retrieves information about a table.
339func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) {
340	ctx = mergeOutgoingMetadata(ctx, ac.md)
341
342	res, err := ac.getTable(ctx, table, btapb.Table_SCHEMA_VIEW)
343	if err != nil {
344		return nil, err
345	}
346
347	ti := &TableInfo{}
348	for name, fam := range res.ColumnFamilies {
349		ti.Families = append(ti.Families, name)
350		ti.FamilyInfos = append(ti.FamilyInfos, FamilyInfo{Name: name, GCPolicy: GCRuleToString(fam.GcRule)})
351	}
352	return ti, nil
353}
354
355// SetGCPolicy specifies which cells in a column family should be garbage collected.
356// GC executes opportunistically in the background; table reads may return data
357// matching the GC policy.
358func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
359	ctx = mergeOutgoingMetadata(ctx, ac.md)
360	prefix := ac.instancePrefix()
361	req := &btapb.ModifyColumnFamiliesRequest{
362		Name: prefix + "/tables/" + table,
363		Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
364			Id:  family,
365			Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}},
366		}},
367	}
368	_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
369	return err
370}
371
372// DropRowRange permanently deletes a row range from the specified table.
373func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error {
374	ctx = mergeOutgoingMetadata(ctx, ac.md)
375	prefix := ac.instancePrefix()
376	req := &btapb.DropRowRangeRequest{
377		Name:   prefix + "/tables/" + table,
378		Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte(rowKeyPrefix)},
379	}
380	_, err := ac.tClient.DropRowRange(ctx, req)
381	return err
382}
383
384// DropAllRows permanently deletes all rows from the specified table.
385func (ac *AdminClient) DropAllRows(ctx context.Context, table string) error {
386	ctx = mergeOutgoingMetadata(ctx, ac.md)
387	prefix := ac.instancePrefix()
388	req := &btapb.DropRowRangeRequest{
389		Name:   prefix + "/tables/" + table,
390		Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true},
391	}
392	_, err := ac.tClient.DropRowRange(ctx, req)
393	return err
394}
395
396// CreateTableFromSnapshot creates a table from snapshot.
397// The table will be created in the same cluster as the snapshot.
398//
399// This is a private alpha release of Cloud Bigtable snapshots. This feature
400// is not currently available to most Cloud Bigtable customers. This feature
401// might be changed in backward-incompatible ways and is not recommended for
402// production use. It is not subject to any SLA or deprecation policy.
403func (ac *AdminClient) CreateTableFromSnapshot(ctx context.Context, table, cluster, snapshot string) error {
404	ctx = mergeOutgoingMetadata(ctx, ac.md)
405	prefix := ac.instancePrefix()
406	snapshotPath := prefix + "/clusters/" + cluster + "/snapshots/" + snapshot
407
408	req := &btapb.CreateTableFromSnapshotRequest{
409		Parent:         prefix,
410		TableId:        table,
411		SourceSnapshot: snapshotPath,
412	}
413	op, err := ac.tClient.CreateTableFromSnapshot(ctx, req)
414	if err != nil {
415		return err
416	}
417	resp := btapb.Table{}
418	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
419}
420
421// DefaultSnapshotDuration is the default TTL for a snapshot.
422const DefaultSnapshotDuration time.Duration = 0
423
424// SnapshotTable creates a new snapshot in the specified cluster from the
425// specified source table. Setting the TTL to `DefaultSnapshotDuration` will
426// use the server side default for the duration.
427//
428// This is a private alpha release of Cloud Bigtable snapshots. This feature
429// is not currently available to most Cloud Bigtable customers. This feature
430// might be changed in backward-incompatible ways and is not recommended for
431// production use. It is not subject to any SLA or deprecation policy.
432func (ac *AdminClient) SnapshotTable(ctx context.Context, table, cluster, snapshot string, ttl time.Duration) error {
433	ctx = mergeOutgoingMetadata(ctx, ac.md)
434	prefix := ac.instancePrefix()
435
436	var ttlProto *durpb.Duration
437
438	if ttl > 0 {
439		ttlProto = ptypes.DurationProto(ttl)
440	}
441
442	req := &btapb.SnapshotTableRequest{
443		Name:       prefix + "/tables/" + table,
444		Cluster:    prefix + "/clusters/" + cluster,
445		SnapshotId: snapshot,
446		Ttl:        ttlProto,
447	}
448
449	op, err := ac.tClient.SnapshotTable(ctx, req)
450	if err != nil {
451		return err
452	}
453	resp := btapb.Snapshot{}
454	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
455}
456
457// Snapshots returns a SnapshotIterator for iterating over the snapshots in a cluster.
458// To list snapshots across all of the clusters in the instance specify "-" as the cluster.
459//
460// This is a private alpha release of Cloud Bigtable snapshots. This feature is not
461// currently available to most Cloud Bigtable customers. This feature might be
462// changed in backward-incompatible ways and is not recommended for production use.
463// It is not subject to any SLA or deprecation policy.
464func (ac *AdminClient) Snapshots(ctx context.Context, cluster string) *SnapshotIterator {
465	ctx = mergeOutgoingMetadata(ctx, ac.md)
466	prefix := ac.instancePrefix()
467	clusterPath := prefix + "/clusters/" + cluster
468
469	it := &SnapshotIterator{}
470	req := &btapb.ListSnapshotsRequest{
471		Parent: clusterPath,
472	}
473
474	fetch := func(pageSize int, pageToken string) (string, error) {
475		req.PageToken = pageToken
476		if pageSize > math.MaxInt32 {
477			req.PageSize = math.MaxInt32
478		} else {
479			req.PageSize = int32(pageSize)
480		}
481
482		var resp *btapb.ListSnapshotsResponse
483		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
484			var err error
485			resp, err = ac.tClient.ListSnapshots(ctx, req)
486			return err
487		}, retryOptions...)
488		if err != nil {
489			return "", err
490		}
491		for _, s := range resp.Snapshots {
492			snapshotInfo, err := newSnapshotInfo(s)
493			if err != nil {
494				return "", fmt.Errorf("failed to parse snapshot proto %v", err)
495			}
496			it.items = append(it.items, snapshotInfo)
497		}
498		return resp.NextPageToken, nil
499	}
500	bufLen := func() int { return len(it.items) }
501	takeBuf := func() interface{} { b := it.items; it.items = nil; return b }
502
503	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
504
505	return it
506}
507
508func newSnapshotInfo(snapshot *btapb.Snapshot) (*SnapshotInfo, error) {
509	nameParts := strings.Split(snapshot.Name, "/")
510	name := nameParts[len(nameParts)-1]
511	tablePathParts := strings.Split(snapshot.SourceTable.Name, "/")
512	tableID := tablePathParts[len(tablePathParts)-1]
513
514	createTime, err := ptypes.Timestamp(snapshot.CreateTime)
515	if err != nil {
516		return nil, fmt.Errorf("invalid createTime: %v", err)
517	}
518
519	deleteTime, err := ptypes.Timestamp(snapshot.DeleteTime)
520	if err != nil {
521		return nil, fmt.Errorf("invalid deleteTime: %v", err)
522	}
523
524	return &SnapshotInfo{
525		Name:        name,
526		SourceTable: tableID,
527		DataSize:    snapshot.DataSizeBytes,
528		CreateTime:  createTime,
529		DeleteTime:  deleteTime,
530	}, nil
531}
532
533// SnapshotIterator is an EntryIterator that iterates over log entries.
534//
535// This is a private alpha release of Cloud Bigtable snapshots. This feature
536// is not currently available to most Cloud Bigtable customers. This feature
537// might be changed in backward-incompatible ways and is not recommended for
538// production use. It is not subject to any SLA or deprecation policy.
539type SnapshotIterator struct {
540	items    []*SnapshotInfo
541	pageInfo *iterator.PageInfo
542	nextFunc func() error
543}
544
545// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
546func (it *SnapshotIterator) PageInfo() *iterator.PageInfo {
547	return it.pageInfo
548}
549
550// Next returns the next result. Its second return value is iterator.Done
551// (https://godoc.org/google.golang.org/api/iterator) if there are no more
552// results. Once Next returns Done, all subsequent calls will return Done.
553func (it *SnapshotIterator) Next() (*SnapshotInfo, error) {
554	if err := it.nextFunc(); err != nil {
555		return nil, err
556	}
557	item := it.items[0]
558	it.items = it.items[1:]
559	return item, nil
560}
561
562// SnapshotInfo contains snapshot metadata.
563type SnapshotInfo struct {
564	Name        string
565	SourceTable string
566	DataSize    int64
567	CreateTime  time.Time
568	DeleteTime  time.Time
569}
570
571// SnapshotInfo gets snapshot metadata.
572//
573// This is a private alpha release of Cloud Bigtable snapshots. This feature
574// is not currently available to most Cloud Bigtable customers. This feature
575// might be changed in backward-incompatible ways and is not recommended for
576// production use. It is not subject to any SLA or deprecation policy.
577func (ac *AdminClient) SnapshotInfo(ctx context.Context, cluster, snapshot string) (*SnapshotInfo, error) {
578	ctx = mergeOutgoingMetadata(ctx, ac.md)
579	prefix := ac.instancePrefix()
580	clusterPath := prefix + "/clusters/" + cluster
581	snapshotPath := clusterPath + "/snapshots/" + snapshot
582
583	req := &btapb.GetSnapshotRequest{
584		Name: snapshotPath,
585	}
586
587	var resp *btapb.Snapshot
588	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
589		var err error
590		resp, err = ac.tClient.GetSnapshot(ctx, req)
591		return err
592	}, retryOptions...)
593	if err != nil {
594		return nil, err
595	}
596
597	return newSnapshotInfo(resp)
598}
599
600// DeleteSnapshot deletes a snapshot in a cluster.
601//
602// This is a private alpha release of Cloud Bigtable snapshots. This feature
603// is not currently available to most Cloud Bigtable customers. This feature
604// might be changed in backward-incompatible ways and is not recommended for
605// production use. It is not subject to any SLA or deprecation policy.
606func (ac *AdminClient) DeleteSnapshot(ctx context.Context, cluster, snapshot string) error {
607	ctx = mergeOutgoingMetadata(ctx, ac.md)
608	prefix := ac.instancePrefix()
609	clusterPath := prefix + "/clusters/" + cluster
610	snapshotPath := clusterPath + "/snapshots/" + snapshot
611
612	req := &btapb.DeleteSnapshotRequest{
613		Name: snapshotPath,
614	}
615	_, err := ac.tClient.DeleteSnapshot(ctx, req)
616	return err
617}
618
619// getConsistencyToken gets the consistency token for a table.
620func (ac *AdminClient) getConsistencyToken(ctx context.Context, tableName string) (string, error) {
621	req := &btapb.GenerateConsistencyTokenRequest{
622		Name: tableName,
623	}
624	resp, err := ac.tClient.GenerateConsistencyToken(ctx, req)
625	if err != nil {
626		return "", err
627	}
628	return resp.GetConsistencyToken(), nil
629}
630
631// isConsistent checks if a token is consistent for a table.
632func (ac *AdminClient) isConsistent(ctx context.Context, tableName, token string) (bool, error) {
633	req := &btapb.CheckConsistencyRequest{
634		Name:             tableName,
635		ConsistencyToken: token,
636	}
637	var resp *btapb.CheckConsistencyResponse
638
639	// Retry calls on retryable errors to avoid losing the token gathered before.
640	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
641		var err error
642		resp, err = ac.tClient.CheckConsistency(ctx, req)
643		return err
644	}, retryOptions...)
645	if err != nil {
646		return false, err
647	}
648	return resp.GetConsistent(), nil
649}
650
651// WaitForReplication waits until all the writes committed before the call started have been propagated to all the clusters in the instance via replication.
652func (ac *AdminClient) WaitForReplication(ctx context.Context, table string) error {
653	ctx = mergeOutgoingMetadata(ctx, ac.md)
654	// Get the token.
655	prefix := ac.instancePrefix()
656	tableName := prefix + "/tables/" + table
657	token, err := ac.getConsistencyToken(ctx, tableName)
658	if err != nil {
659		return err
660	}
661
662	// Periodically check if the token is consistent.
663	timer := time.NewTicker(time.Second * 10)
664	defer timer.Stop()
665	for {
666		consistent, err := ac.isConsistent(ctx, tableName, token)
667		if err != nil {
668			return err
669		}
670		if consistent {
671			return nil
672		}
673		// Sleep for a bit or until the ctx is cancelled.
674		select {
675		case <-ctx.Done():
676			return ctx.Err()
677		case <-timer.C:
678		}
679	}
680}
681
682// TableIAM creates an IAM Handle specific to a given Instance and Table within the configured project.
683func (ac *AdminClient) TableIAM(tableID string) *iam.Handle {
684	return iam.InternalNewHandleGRPCClient(ac.tClient,
685		"projects/"+ac.project+"/instances/"+ac.instance+"/tables/"+tableID)
686}
687
688// BackupIAM creates an IAM Handle specific to a given Cluster and Backup.
689func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
690	return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup))
691}
692
693const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
694const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"
695
696// InstanceAdminClient is a client type for performing admin operations on instances.
697// These operations can be substantially more dangerous than those provided by AdminClient.
698type InstanceAdminClient struct {
699	connPool  gtransport.ConnPool
700	iClient   btapb.BigtableInstanceAdminClient
701	lroClient *lroauto.OperationsClient
702
703	project string
704
705	// Metadata to be sent with each request.
706	md metadata.MD
707}
708
709// NewInstanceAdminClient creates a new InstanceAdminClient for a given project.
710func NewInstanceAdminClient(ctx context.Context, project string, opts ...option.ClientOption) (*InstanceAdminClient, error) {
711	o, err := btopt.DefaultClientOptions(instanceAdminAddr, mtlsInstanceAdminAddr, InstanceAdminScope, clientUserAgent)
712	if err != nil {
713		return nil, err
714	}
715	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
716	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
717	o = append(o, opts...)
718	connPool, err := gtransport.DialPool(ctx, o...)
719	if err != nil {
720		return nil, fmt.Errorf("dialing: %v", err)
721	}
722
723	lroClient, err := lroauto.NewOperationsClient(ctx, gtransport.WithConnPool(connPool))
724	if err != nil {
725		// This error "should not happen", since we are just reusing old connection
726		// and never actually need to dial.
727		// If this does happen, we could leak conn. However, we cannot close conn:
728		// If the user invoked the function with option.WithGRPCConn,
729		// we would close a connection that's still in use.
730		// TODO(pongad): investigate error conditions.
731		return nil, err
732	}
733
734	return &InstanceAdminClient{
735		connPool:  connPool,
736		iClient:   btapb.NewBigtableInstanceAdminClient(connPool),
737		lroClient: lroClient,
738
739		project: project,
740		md:      metadata.Pairs(resourcePrefixHeader, "projects/"+project),
741	}, nil
742}
743
744// Close closes the InstanceAdminClient.
745func (iac *InstanceAdminClient) Close() error {
746	return iac.connPool.Close()
747}
748
749// StorageType is the type of storage used for all tables in an instance
750type StorageType int
751
752const (
753	SSD StorageType = iota
754	HDD
755)
756
757func (st StorageType) proto() btapb.StorageType {
758	if st == HDD {
759		return btapb.StorageType_HDD
760	}
761	return btapb.StorageType_SSD
762}
763
764func storageTypeFromProto(st btapb.StorageType) StorageType {
765	if st == btapb.StorageType_HDD {
766		return HDD
767	}
768
769	return SSD
770}
771
772// InstanceState is the state of the instance. This is output-only.
773type InstanceState int32
774
775const (
776	// NotKnown represents the state of an instance that could not be determined.
777	NotKnown InstanceState = InstanceState(btapb.Instance_STATE_NOT_KNOWN)
778	// Ready represents the state of an instance that has been successfully created.
779	Ready = InstanceState(btapb.Instance_READY)
780	// Creating represents the state of an instance that is currently being created.
781	Creating = InstanceState(btapb.Instance_CREATING)
782)
783
784// InstanceType is the type of the instance.
785type InstanceType int32
786
787const (
788	// UNSPECIFIED instance types default to PRODUCTION
789	UNSPECIFIED InstanceType = InstanceType(btapb.Instance_TYPE_UNSPECIFIED)
790	PRODUCTION               = InstanceType(btapb.Instance_PRODUCTION)
791	DEVELOPMENT              = InstanceType(btapb.Instance_DEVELOPMENT)
792)
793
794// InstanceInfo represents information about an instance
795type InstanceInfo struct {
796	Name          string // name of the instance
797	DisplayName   string // display name for UIs
798	InstanceState InstanceState
799	InstanceType  InstanceType
800	Labels        map[string]string
801}
802
803// InstanceConf contains the information necessary to create an Instance
804type InstanceConf struct {
805	InstanceId, DisplayName, ClusterId, Zone string
806	// NumNodes must not be specified for DEVELOPMENT instance types
807	NumNodes     int32
808	StorageType  StorageType
809	InstanceType InstanceType
810	Labels       map[string]string
811}
812
813// InstanceWithClustersConfig contains the information necessary to create an Instance
814type InstanceWithClustersConfig struct {
815	InstanceID, DisplayName string
816	Clusters                []ClusterConfig
817	InstanceType            InstanceType
818	Labels                  map[string]string
819}
820
821var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][-a-z0-9]*)$`)
822
823// CreateInstance creates a new instance in the project.
824// This method will return when the instance has been created or when an error occurs.
825func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
826	ctx = mergeOutgoingMetadata(ctx, iac.md)
827	newConfig := InstanceWithClustersConfig{
828		InstanceID:   conf.InstanceId,
829		DisplayName:  conf.DisplayName,
830		InstanceType: conf.InstanceType,
831		Labels:       conf.Labels,
832		Clusters: []ClusterConfig{
833			{
834				InstanceID:  conf.InstanceId,
835				ClusterID:   conf.ClusterId,
836				Zone:        conf.Zone,
837				NumNodes:    conf.NumNodes,
838				StorageType: conf.StorageType,
839			},
840		},
841	}
842	return iac.CreateInstanceWithClusters(ctx, &newConfig)
843}
844
845// CreateInstanceWithClusters creates a new instance with configured clusters in the project.
846// This method will return when the instance has been created or when an error occurs.
847func (iac *InstanceAdminClient) CreateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
848	ctx = mergeOutgoingMetadata(ctx, iac.md)
849	clusters := make(map[string]*btapb.Cluster)
850	for _, cluster := range conf.Clusters {
851		clusters[cluster.ClusterID] = cluster.proto(iac.project)
852	}
853
854	req := &btapb.CreateInstanceRequest{
855		Parent:     "projects/" + iac.project,
856		InstanceId: conf.InstanceID,
857		Instance: &btapb.Instance{
858			DisplayName: conf.DisplayName,
859			Type:        btapb.Instance_Type(conf.InstanceType),
860			Labels:      conf.Labels,
861		},
862		Clusters: clusters,
863	}
864
865	lro, err := iac.iClient.CreateInstance(ctx, req)
866	if err != nil {
867		return err
868	}
869	resp := btapb.Instance{}
870	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
871}
872
873// updateInstance updates a single instance based on config fields that operate
874// at an instance level: DisplayName and InstanceType.
875func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *InstanceWithClustersConfig) (updated bool, err error) {
876	if conf.InstanceID == "" {
877		return false, errors.New("InstanceID is required")
878	}
879
880	// Update the instance, if necessary
881	mask := &field_mask.FieldMask{}
882	ireq := &btapb.PartialUpdateInstanceRequest{
883		Instance: &btapb.Instance{
884			Name: "projects/" + iac.project + "/instances/" + conf.InstanceID,
885		},
886		UpdateMask: mask,
887	}
888	if conf.DisplayName != "" {
889		ireq.Instance.DisplayName = conf.DisplayName
890		mask.Paths = append(mask.Paths, "display_name")
891	}
892	if btapb.Instance_Type(conf.InstanceType) != btapb.Instance_TYPE_UNSPECIFIED {
893		ireq.Instance.Type = btapb.Instance_Type(conf.InstanceType)
894		mask.Paths = append(mask.Paths, "type")
895	}
896	if conf.Labels != nil {
897		ireq.Instance.Labels = conf.Labels
898		mask.Paths = append(mask.Paths, "labels")
899	}
900
901	if len(mask.Paths) == 0 {
902		return false, nil
903	}
904
905	lro, err := iac.iClient.PartialUpdateInstance(ctx, ireq)
906	if err != nil {
907		return false, err
908	}
909	err = longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
910	if err != nil {
911		return false, err
912	}
913
914	return true, nil
915}
916
917// UpdateInstanceWithClusters updates an instance and its clusters. Updateable
918// fields are instance display name, instance type and cluster size.
919// The provided InstanceWithClustersConfig is used as follows:
920// - InstanceID is required
921// - DisplayName and InstanceType are updated only if they are not empty
922// - ClusterID is required for any provided cluster
923// - All other cluster fields are ignored except for NumNodes, which if set will be updated
924//
925// This method may return an error after partially succeeding, for example if the instance is updated
926// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
927// determine the current state.
928func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context, conf *InstanceWithClustersConfig) error {
929	ctx = mergeOutgoingMetadata(ctx, iac.md)
930
931	for _, cluster := range conf.Clusters {
932		if cluster.ClusterID == "" {
933			return errors.New("ClusterID is required for every cluster")
934		}
935	}
936
937	updatedInstance, err := iac.updateInstance(ctx, conf)
938	if err != nil {
939		return err
940	}
941
942	// Update any clusters
943	for _, cluster := range conf.Clusters {
944		err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
945		if err != nil {
946			if updatedInstance {
947				// We updated the instance, so note that in the error message.
948				return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded",
949					cluster.ClusterID, err)
950			}
951			return err
952		}
953	}
954
955	return nil
956}
957
958// DeleteInstance deletes an instance from the project.
959func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceID string) error {
960	ctx = mergeOutgoingMetadata(ctx, iac.md)
961	req := &btapb.DeleteInstanceRequest{Name: "projects/" + iac.project + "/instances/" + instanceID}
962	_, err := iac.iClient.DeleteInstance(ctx, req)
963	return err
964}
965
966// Instances returns a list of instances in the project. If any location
967// (cluster) is unavailable due to some transient conditions, Instances
968// returns partial results and ErrPartiallyUnavailable error with
969// unavailable locations list.
970func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) {
971	ctx = mergeOutgoingMetadata(ctx, iac.md)
972	req := &btapb.ListInstancesRequest{
973		Parent: "projects/" + iac.project,
974	}
975	var res *btapb.ListInstancesResponse
976	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
977		var err error
978		res, err = iac.iClient.ListInstances(ctx, req)
979		return err
980	}, retryOptions...)
981	if err != nil {
982		return nil, err
983	}
984
985	var is []*InstanceInfo
986	for _, i := range res.Instances {
987		m := instanceNameRegexp.FindStringSubmatch(i.Name)
988		if m == nil {
989			return nil, fmt.Errorf("malformed instance name %q", i.Name)
990		}
991		is = append(is, &InstanceInfo{
992			Name:          m[2],
993			DisplayName:   i.DisplayName,
994			InstanceState: InstanceState(i.State),
995			InstanceType:  InstanceType(i.Type),
996			Labels:        i.Labels,
997		})
998	}
999	if len(res.FailedLocations) > 0 {
1000		// Return partial results and an error in
1001		// case of some locations are unavailable.
1002		return is, ErrPartiallyUnavailable{res.FailedLocations}
1003	}
1004	return is, nil
1005}
1006
1007// InstanceInfo returns information about an instance.
1008func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID string) (*InstanceInfo, error) {
1009	ctx = mergeOutgoingMetadata(ctx, iac.md)
1010	req := &btapb.GetInstanceRequest{
1011		Name: "projects/" + iac.project + "/instances/" + instanceID,
1012	}
1013	var res *btapb.Instance
1014	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1015		var err error
1016		res, err = iac.iClient.GetInstance(ctx, req)
1017		return err
1018	}, retryOptions...)
1019	if err != nil {
1020		return nil, err
1021	}
1022
1023	m := instanceNameRegexp.FindStringSubmatch(res.Name)
1024	if m == nil {
1025		return nil, fmt.Errorf("malformed instance name %q", res.Name)
1026	}
1027	return &InstanceInfo{
1028		Name:          m[2],
1029		DisplayName:   res.DisplayName,
1030		InstanceState: InstanceState(res.State),
1031		InstanceType:  InstanceType(res.Type),
1032		Labels:        res.Labels,
1033	}, nil
1034}
1035
1036// ClusterConfig contains the information necessary to create a cluster
1037type ClusterConfig struct {
1038	// InstanceID specifies the unique name of the instance. Required.
1039	InstanceID string
1040
1041	// ClusterID specifies the unique name of the cluster. Required.
1042	ClusterID string
1043
1044	// Zone specifies the location where this cluster's nodes and storage reside.
1045	// For best performance, clients should be located as close as possible to this
1046	// cluster. Required.
1047	Zone string
1048
1049	// NumNodes specifies the number of nodes allocated to this cluster. More
1050	// nodes enable higher throughput and more consistent performance. Required.
1051	NumNodes int32
1052
1053	// StorageType specifies the type of storage used by this cluster to serve
1054	// its parent instance's tables, unless explicitly overridden. Required.
1055	StorageType StorageType
1056
1057	// KMSKeyName is the name of the KMS customer managed encryption key (CMEK)
1058	// to use for at-rest encryption of data in this cluster.  If omitted,
1059	// Google's default encryption will be used. If specified, the requirements
1060	// for this key are:
1061	// 1) The Cloud Bigtable service account associated with the
1062	//    project that contains the cluster must be granted the
1063	//    ``cloudkms.cryptoKeyEncrypterDecrypter`` role on the
1064	//    CMEK.
1065	// 2) Only regional keys can be used and the region of the
1066	//    CMEK key must match the region of the cluster.
1067	// 3) All clusters within an instance must use the same CMEK
1068	//    key.
1069	// Optional. Immutable.
1070	KMSKeyName string
1071}
1072
1073func (cc *ClusterConfig) proto(project string) *btapb.Cluster {
1074	ec := btapb.Cluster_EncryptionConfig{}
1075	ec.KmsKeyName = cc.KMSKeyName
1076	return &btapb.Cluster{
1077		ServeNodes:         cc.NumNodes,
1078		DefaultStorageType: cc.StorageType.proto(),
1079		Location:           "projects/" + project + "/locations/" + cc.Zone,
1080		EncryptionConfig:   &ec,
1081	}
1082}
1083
1084// ClusterInfo represents information about a cluster.
1085type ClusterInfo struct {
1086	// Name is the name of the cluster.
1087	Name string
1088
1089	// Zone is the GCP zone of the cluster (e.g. "us-central1-a").
1090	Zone string
1091
1092	// ServeNodes is the number of allocated serve nodes.
1093	ServeNodes int
1094
1095	// State is the state of the cluster.
1096	State string
1097
1098	// StorageType is the storage type of the cluster.
1099	StorageType StorageType
1100
1101	// KMSKeyName is the customer managed encryption key for the cluster.
1102	KMSKeyName string
1103}
1104
1105// CreateCluster creates a new cluster in an instance.
1106// This method will return when the cluster has been created or when an error occurs.
1107func (iac *InstanceAdminClient) CreateCluster(ctx context.Context, conf *ClusterConfig) error {
1108	ctx = mergeOutgoingMetadata(ctx, iac.md)
1109
1110	req := &btapb.CreateClusterRequest{
1111		Parent:    "projects/" + iac.project + "/instances/" + conf.InstanceID,
1112		ClusterId: conf.ClusterID,
1113		Cluster:   conf.proto(iac.project),
1114	}
1115
1116	lro, err := iac.iClient.CreateCluster(ctx, req)
1117	if err != nil {
1118		return err
1119	}
1120	resp := btapb.Cluster{}
1121	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, &resp)
1122}
1123
1124// DeleteCluster deletes a cluster from an instance.
1125func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, clusterID string) error {
1126	ctx = mergeOutgoingMetadata(ctx, iac.md)
1127	req := &btapb.DeleteClusterRequest{Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID}
1128	_, err := iac.iClient.DeleteCluster(ctx, req)
1129	return err
1130}
1131
1132// UpdateCluster updates attributes of a cluster
1133func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error {
1134	ctx = mergeOutgoingMetadata(ctx, iac.md)
1135	cluster := &btapb.Cluster{
1136		Name:       "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
1137		ServeNodes: serveNodes}
1138	lro, err := iac.iClient.UpdateCluster(ctx, cluster)
1139	if err != nil {
1140		return err
1141	}
1142	return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
1143}
1144
1145// Clusters lists the clusters in an instance. If any location
1146// (cluster) is unavailable due to some transient conditions, Clusters
1147// returns partial results and ErrPartiallyUnavailable error with
1148// unavailable locations list.
1149func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string) ([]*ClusterInfo, error) {
1150	ctx = mergeOutgoingMetadata(ctx, iac.md)
1151	req := &btapb.ListClustersRequest{Parent: "projects/" + iac.project + "/instances/" + instanceID}
1152	var res *btapb.ListClustersResponse
1153	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1154		var err error
1155		res, err = iac.iClient.ListClusters(ctx, req)
1156		return err
1157	}, retryOptions...)
1158	if err != nil {
1159		return nil, err
1160	}
1161
1162	var cis []*ClusterInfo
1163	for _, c := range res.Clusters {
1164		nameParts := strings.Split(c.Name, "/")
1165		locParts := strings.Split(c.Location, "/")
1166		kmsKeyName := ""
1167		if c.EncryptionConfig != nil {
1168			kmsKeyName = c.EncryptionConfig.KmsKeyName
1169		}
1170		cis = append(cis, &ClusterInfo{
1171			Name:        nameParts[len(nameParts)-1],
1172			Zone:        locParts[len(locParts)-1],
1173			ServeNodes:  int(c.ServeNodes),
1174			State:       c.State.String(),
1175			StorageType: storageTypeFromProto(c.DefaultStorageType),
1176			KMSKeyName:  kmsKeyName,
1177		})
1178	}
1179	if len(res.FailedLocations) > 0 {
1180		// Return partial results and an error in
1181		// case of some locations are unavailable.
1182		return cis, ErrPartiallyUnavailable{res.FailedLocations}
1183	}
1184	return cis, nil
1185}
1186
1187// GetCluster fetches a cluster in an instance
1188func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clusterID string) (*ClusterInfo, error) {
1189	ctx = mergeOutgoingMetadata(ctx, iac.md)
1190	req := &btapb.GetClusterRequest{
1191		Name: fmt.Sprintf("projects/%s/instances/%s/clusters/%s", iac.project, instanceID, clusterID),
1192	}
1193	var c *btapb.Cluster
1194	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1195		var err error
1196		c, err = iac.iClient.GetCluster(ctx, req)
1197		return err
1198	}, retryOptions...)
1199	if err != nil {
1200		return nil, err
1201	}
1202
1203	kmsKeyName := ""
1204	if c.EncryptionConfig != nil {
1205		kmsKeyName = c.EncryptionConfig.KmsKeyName
1206	}
1207	nameParts := strings.Split(c.Name, "/")
1208	locParts := strings.Split(c.Location, "/")
1209	cis := &ClusterInfo{
1210		Name:        nameParts[len(nameParts)-1],
1211		Zone:        locParts[len(locParts)-1],
1212		ServeNodes:  int(c.ServeNodes),
1213		State:       c.State.String(),
1214		StorageType: storageTypeFromProto(c.DefaultStorageType),
1215		KMSKeyName:  kmsKeyName,
1216	}
1217	return cis, nil
1218}
1219
1220// InstanceIAM returns the instance's IAM handle.
1221func (iac *InstanceAdminClient) InstanceIAM(instanceID string) *iam.Handle {
1222	return iam.InternalNewHandleGRPCClient(iac.iClient, "projects/"+iac.project+"/instances/"+instanceID)
1223}
1224
1225// Routing policies.
1226const (
1227	// MultiClusterRouting is a policy that allows read/write requests to be
1228	// routed to any cluster in the instance. Requests will will fail over to
1229	// another cluster in the event of transient errors or delays. Choosing
1230	// this option sacrifices read-your-writes consistency to improve
1231	// availability.
1232	MultiClusterRouting = "multi_cluster_routing_use_any"
1233	// SingleClusterRouting is a policy that unconditionally routes all
1234	// read/write requests to a specific cluster. This option preserves
1235	// read-your-writes consistency, but does not improve availability.
1236	SingleClusterRouting = "single_cluster_routing"
1237)
1238
1239// ProfileConf contains the information necessary to create an profile
1240type ProfileConf struct {
1241	Name                     string
1242	ProfileID                string
1243	InstanceID               string
1244	Etag                     string
1245	Description              string
1246	RoutingPolicy            string
1247	ClusterID                string
1248	AllowTransactionalWrites bool
1249
1250	// If true, warnings are ignored
1251	IgnoreWarnings bool
1252}
1253
1254// ProfileIterator iterates over profiles.
1255type ProfileIterator struct {
1256	items    []*btapb.AppProfile
1257	pageInfo *iterator.PageInfo
1258	nextFunc func() error
1259}
1260
1261// ProfileAttrsToUpdate define addrs to update during an Update call. If unset, no fields will be replaced.
1262type ProfileAttrsToUpdate struct {
1263	// If set, updates the description.
1264	Description optional.String
1265
1266	//If set, updates the routing policy.
1267	RoutingPolicy optional.String
1268
1269	//If RoutingPolicy is updated to SingleClusterRouting, set these fields as well.
1270	ClusterID                string
1271	AllowTransactionalWrites bool
1272
1273	// If true, warnings are ignored
1274	IgnoreWarnings bool
1275}
1276
1277// GetFieldMaskPath returns the field mask path.
1278func (p *ProfileAttrsToUpdate) GetFieldMaskPath() []string {
1279	path := make([]string, 0)
1280	if p.Description != nil {
1281		path = append(path, "description")
1282	}
1283
1284	if p.RoutingPolicy != nil {
1285		path = append(path, optional.ToString(p.RoutingPolicy))
1286	}
1287	return path
1288}
1289
1290// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
1291func (it *ProfileIterator) PageInfo() *iterator.PageInfo {
1292	return it.pageInfo
1293}
1294
1295// Next returns the next result. Its second return value is iterator.Done
1296// (https://godoc.org/google.golang.org/api/iterator) if there are no more
1297// results. Once Next returns Done, all subsequent calls will return Done.
1298func (it *ProfileIterator) Next() (*btapb.AppProfile, error) {
1299	if err := it.nextFunc(); err != nil {
1300		return nil, err
1301	}
1302	item := it.items[0]
1303	it.items = it.items[1:]
1304	return item, nil
1305}
1306
1307// CreateAppProfile creates an app profile within an instance.
1308func (iac *InstanceAdminClient) CreateAppProfile(ctx context.Context, profile ProfileConf) (*btapb.AppProfile, error) {
1309	ctx = mergeOutgoingMetadata(ctx, iac.md)
1310	parent := "projects/" + iac.project + "/instances/" + profile.InstanceID
1311	appProfile := &btapb.AppProfile{
1312		Etag:        profile.Etag,
1313		Description: profile.Description,
1314	}
1315
1316	if profile.RoutingPolicy == "" {
1317		return nil, errors.New("invalid routing policy")
1318	}
1319
1320	switch profile.RoutingPolicy {
1321	case MultiClusterRouting:
1322		appProfile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1323			MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1324		}
1325	case SingleClusterRouting:
1326		appProfile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1327			SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1328				ClusterId:                profile.ClusterID,
1329				AllowTransactionalWrites: profile.AllowTransactionalWrites,
1330			},
1331		}
1332	default:
1333		return nil, errors.New("invalid routing policy")
1334	}
1335
1336	return iac.iClient.CreateAppProfile(ctx, &btapb.CreateAppProfileRequest{
1337		Parent:         parent,
1338		AppProfile:     appProfile,
1339		AppProfileId:   profile.ProfileID,
1340		IgnoreWarnings: profile.IgnoreWarnings,
1341	})
1342}
1343
1344// GetAppProfile gets information about an app profile.
1345func (iac *InstanceAdminClient) GetAppProfile(ctx context.Context, instanceID, name string) (*btapb.AppProfile, error) {
1346	ctx = mergeOutgoingMetadata(ctx, iac.md)
1347	profileRequest := &btapb.GetAppProfileRequest{
1348		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1349	}
1350	var ap *btapb.AppProfile
1351	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1352		var err error
1353		ap, err = iac.iClient.GetAppProfile(ctx, profileRequest)
1354		return err
1355	}, retryOptions...)
1356	if err != nil {
1357		return nil, err
1358	}
1359	return ap, err
1360}
1361
1362// ListAppProfiles lists information about app profiles in an instance.
1363func (iac *InstanceAdminClient) ListAppProfiles(ctx context.Context, instanceID string) *ProfileIterator {
1364	ctx = mergeOutgoingMetadata(ctx, iac.md)
1365	listRequest := &btapb.ListAppProfilesRequest{
1366		Parent: "projects/" + iac.project + "/instances/" + instanceID,
1367	}
1368
1369	pit := &ProfileIterator{}
1370	fetch := func(pageSize int, pageToken string) (string, error) {
1371		listRequest.PageToken = pageToken
1372		var profileRes *btapb.ListAppProfilesResponse
1373		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1374			var err error
1375			profileRes, err = iac.iClient.ListAppProfiles(ctx, listRequest)
1376			return err
1377		}, retryOptions...)
1378		if err != nil {
1379			return "", err
1380		}
1381
1382		pit.items = append(pit.items, profileRes.AppProfiles...)
1383		return profileRes.NextPageToken, nil
1384	}
1385
1386	bufLen := func() int { return len(pit.items) }
1387	takeBuf := func() interface{} { b := pit.items; pit.items = nil; return b }
1388	pit.pageInfo, pit.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
1389	return pit
1390
1391}
1392
1393// UpdateAppProfile updates an app profile within an instance.
1394// updateAttrs should be set. If unset, all fields will be replaced.
1395func (iac *InstanceAdminClient) UpdateAppProfile(ctx context.Context, instanceID, profileID string, updateAttrs ProfileAttrsToUpdate) error {
1396	ctx = mergeOutgoingMetadata(ctx, iac.md)
1397
1398	profile := &btapb.AppProfile{
1399		Name: "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + profileID,
1400	}
1401
1402	if updateAttrs.Description != nil {
1403		profile.Description = optional.ToString(updateAttrs.Description)
1404	}
1405	if updateAttrs.RoutingPolicy != nil {
1406		switch optional.ToString(updateAttrs.RoutingPolicy) {
1407		case MultiClusterRouting:
1408			profile.RoutingPolicy = &btapb.AppProfile_MultiClusterRoutingUseAny_{
1409				MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
1410			}
1411		case SingleClusterRouting:
1412			profile.RoutingPolicy = &btapb.AppProfile_SingleClusterRouting_{
1413				SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
1414					ClusterId:                updateAttrs.ClusterID,
1415					AllowTransactionalWrites: updateAttrs.AllowTransactionalWrites,
1416				},
1417			}
1418		default:
1419			return errors.New("invalid routing policy")
1420		}
1421	}
1422	patchRequest := &btapb.UpdateAppProfileRequest{
1423		AppProfile: profile,
1424		UpdateMask: &field_mask.FieldMask{
1425			Paths: updateAttrs.GetFieldMaskPath(),
1426		},
1427		IgnoreWarnings: updateAttrs.IgnoreWarnings,
1428	}
1429	updateRequest, err := iac.iClient.UpdateAppProfile(ctx, patchRequest)
1430	if err != nil {
1431		return err
1432	}
1433
1434	return longrunning.InternalNewOperation(iac.lroClient, updateRequest).Wait(ctx, nil)
1435
1436}
1437
1438// DeleteAppProfile deletes an app profile from an instance.
1439func (iac *InstanceAdminClient) DeleteAppProfile(ctx context.Context, instanceID, name string) error {
1440	ctx = mergeOutgoingMetadata(ctx, iac.md)
1441	deleteProfileRequest := &btapb.DeleteAppProfileRequest{
1442		Name:           "projects/" + iac.project + "/instances/" + instanceID + "/appProfiles/" + name,
1443		IgnoreWarnings: true,
1444	}
1445	_, err := iac.iClient.DeleteAppProfile(ctx, deleteProfileRequest)
1446	return err
1447
1448}
1449
1450// UpdateInstanceResults contains information about the
1451// changes made after invoking UpdateInstanceAndSyncClusters.
1452type UpdateInstanceResults struct {
1453	InstanceUpdated bool
1454	CreatedClusters []string
1455	DeletedClusters []string
1456	UpdatedClusters []string
1457}
1458
1459func (r *UpdateInstanceResults) String() string {
1460	return fmt.Sprintf("Instance updated? %v Clusters added:%v Clusters deleted:%v Clusters updated:%v",
1461		r.InstanceUpdated, r.CreatedClusters, r.DeletedClusters, r.UpdatedClusters)
1462}
1463
1464func max(x, y int) int {
1465	if x > y {
1466		return x
1467	}
1468	return y
1469}
1470
1471// UpdateInstanceAndSyncClusters updates an instance and its clusters, and will synchronize the
1472// clusters in the instance with the provided clusters, creating and deleting them as necessary.
1473// The provided InstanceWithClustersConfig is used as follows:
1474// - InstanceID is required
1475// - DisplayName and InstanceType are updated only if they are not empty
1476// - ClusterID is required for any provided cluster
1477// - Any cluster present in conf.Clusters but not part of the instance will be created using CreateCluster
1478//   and the given ClusterConfig.
1479// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance
1480//   using DeleteCluster.
1481// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the
1482//   provided number of nodes if set.
1483//
1484// This method may return an error after partially succeeding, for example if the instance is updated
1485// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
1486// determine the current state. The return UpdateInstanceResults will describe the work done by the
1487// method, whether partial or complete.
1488func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient, conf *InstanceWithClustersConfig) (*UpdateInstanceResults, error) {
1489	ctx = mergeOutgoingMetadata(ctx, iac.md)
1490
1491	// First fetch the existing clusters so we know what to remove, add or update.
1492	existingClusters, err := iac.Clusters(ctx, conf.InstanceID)
1493	if err != nil {
1494		return nil, err
1495	}
1496
1497	updatedInstance, err := iac.updateInstance(ctx, conf)
1498	if err != nil {
1499		return nil, err
1500	}
1501
1502	results := &UpdateInstanceResults{InstanceUpdated: updatedInstance}
1503
1504	existingClusterNames := make(map[string]bool)
1505	for _, cluster := range existingClusters {
1506		existingClusterNames[cluster.Name] = true
1507	}
1508
1509	// Synchronize clusters that were passed in with the existing clusters in the instance.
1510	// First update any cluster we encounter that already exists in the instance.
1511	// Collect the clusters that we will create and delete so that we can minimize disruption
1512	// of the instance.
1513	clustersToCreate := list.New()
1514	clustersToDelete := list.New()
1515	for _, cluster := range conf.Clusters {
1516		_, clusterExists := existingClusterNames[cluster.ClusterID]
1517		if !clusterExists {
1518			// The cluster doesn't exist yet, so we must create it.
1519			clustersToCreate.PushBack(cluster)
1520			continue
1521		}
1522		delete(existingClusterNames, cluster.ClusterID)
1523
1524		if cluster.NumNodes <= 0 {
1525			// We only synchronize clusters with a valid number of nodes.
1526			continue
1527		}
1528
1529		// We simply want to update this cluster
1530		err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
1531		if err != nil {
1532			return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v",
1533				cluster.ClusterID, err, results)
1534		}
1535		results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID)
1536	}
1537
1538	// Any cluster left in existingClusterNames was NOT in the given config and should be deleted.
1539	for clusterToDelete := range existingClusterNames {
1540		clustersToDelete.PushBack(clusterToDelete)
1541	}
1542
1543	// Now that we have the clusters that we need to create and delete, we do so keeping the following
1544	// in mind:
1545	// - Don't delete the last cluster in the instance, as that will result in an error.
1546	// - Attempt to offset each deletion with a creation before another deletion, so that instance
1547	//   capacity is never reduced more than necessary.
1548	// Note that there is a limit on number of clusters in an instance which we are not aware of here,
1549	// so delete a cluster before adding one (as long as there are > 1 clusters left) so that we are
1550	// less likely to exceed the maximum number of clusters.
1551	numExistingClusters := len(existingClusters)
1552	nextCreation := clustersToCreate.Front()
1553	nextDeletion := clustersToDelete.Front()
1554	for {
1555		// We are done when both lists are empty.
1556		if nextCreation == nil && nextDeletion == nil {
1557			break
1558		}
1559
1560		// If there is more than one existing cluster, we always want to delete first if possible.
1561		// If there are no more creations left, always go ahead with the deletion.
1562		if (numExistingClusters > 1 && nextDeletion != nil) || nextCreation == nil {
1563			clusterToDelete := nextDeletion.Value.(string)
1564			err = iac.DeleteCluster(ctx, conf.InstanceID, clusterToDelete)
1565			if err != nil {
1566				return results, fmt.Errorf("DeleteCluster %q failed %v; Progress: %v",
1567					clusterToDelete, err, results)
1568			}
1569			results.DeletedClusters = append(results.DeletedClusters, clusterToDelete)
1570			numExistingClusters--
1571			nextDeletion = nextDeletion.Next()
1572		}
1573
1574		// Now create a new cluster if required.
1575		if nextCreation != nil {
1576			clusterToCreate := nextCreation.Value.(ClusterConfig)
1577			// Assume the cluster config is well formed and rely on the underlying call to error out.
1578			// Make sure to set the InstanceID, though, since we know what it must be.
1579			clusterToCreate.InstanceID = conf.InstanceID
1580			err = iac.CreateCluster(ctx, &clusterToCreate)
1581			if err != nil {
1582				return results, fmt.Errorf("CreateCluster %v failed %v; Progress: %v",
1583					clusterToCreate, err, results)
1584			}
1585			results.CreatedClusters = append(results.CreatedClusters, clusterToCreate.ClusterID)
1586			numExistingClusters++
1587			nextCreation = nextCreation.Next()
1588		}
1589	}
1590
1591	return results, nil
1592}
1593
1594// RestoreTable creates a table from a backup. The table will be created in the same cluster as the backup.
1595// To restore a table to a different instance, see RestoreTableFrom.
1596func (ac *AdminClient) RestoreTable(ctx context.Context, table, cluster, backup string) error {
1597	return ac.RestoreTableFrom(ctx, ac.instance, table, cluster, backup)
1598}
1599
1600// RestoreTableFrom creates a new table in the admin's instance by restoring from the given backup and instance.
1601// To restore within the same instance, see RestoreTable.
1602// sourceInstance (ex. "my-instance") and sourceCluster (ex. "my-cluster") are the instance and cluster in which the new table will be restored from.
1603// tableName (ex. "my-restored-table") will be the name of the newly created table.
1604// backupName (ex. "my-backup") is the name of the backup to restore.
1605func (ac *AdminClient) RestoreTableFrom(ctx context.Context, sourceInstance, table, sourceCluster, backup string) error {
1606	ctx = mergeOutgoingMetadata(ctx, ac.md)
1607	parent := ac.instancePrefix()
1608	sourceBackupPath := ac.backupPath(sourceCluster, sourceInstance, backup)
1609	req := &btapb.RestoreTableRequest{
1610		Parent:  parent,
1611		TableId: table,
1612		Source:  &btapb.RestoreTableRequest_Backup{sourceBackupPath},
1613	}
1614	op, err := ac.tClient.RestoreTable(ctx, req)
1615	if err != nil {
1616		return err
1617	}
1618	resp := btapb.Table{}
1619	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
1620}
1621
1622// CreateBackup creates a new backup in the specified cluster from the
1623// specified source table with the user-provided expire time.
1624func (ac *AdminClient) CreateBackup(ctx context.Context, table, cluster, backup string, expireTime time.Time) error {
1625	ctx = mergeOutgoingMetadata(ctx, ac.md)
1626	prefix := ac.instancePrefix()
1627
1628	parsedExpireTime, err := ptypes.TimestampProto(expireTime)
1629	if err != nil {
1630		return err
1631	}
1632
1633	req := &btapb.CreateBackupRequest{
1634		Parent:   prefix + "/clusters/" + cluster,
1635		BackupId: backup,
1636		Backup: &btapb.Backup{
1637			ExpireTime:  parsedExpireTime,
1638			SourceTable: prefix + "/tables/" + table,
1639		},
1640	}
1641
1642	op, err := ac.tClient.CreateBackup(ctx, req)
1643	if err != nil {
1644		return err
1645	}
1646	resp := btapb.Backup{}
1647	return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
1648}
1649
1650// Backups returns a BackupIterator for iterating over the backups in a cluster.
1651// To list backups across all of the clusters in the instance specify "-" as the cluster.
1652func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupIterator {
1653	ctx = mergeOutgoingMetadata(ctx, ac.md)
1654	prefix := ac.instancePrefix()
1655	clusterPath := prefix + "/clusters/" + cluster
1656
1657	it := &BackupIterator{}
1658	req := &btapb.ListBackupsRequest{
1659		Parent: clusterPath,
1660	}
1661
1662	fetch := func(pageSize int, pageToken string) (string, error) {
1663		req.PageToken = pageToken
1664		if pageSize > math.MaxInt32 {
1665			req.PageSize = math.MaxInt32
1666		} else {
1667			req.PageSize = int32(pageSize)
1668		}
1669
1670		var resp *btapb.ListBackupsResponse
1671		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1672			var err error
1673			resp, err = ac.tClient.ListBackups(ctx, req)
1674			return err
1675		}, retryOptions...)
1676		if err != nil {
1677			return "", err
1678		}
1679		for _, s := range resp.Backups {
1680			backupInfo, err := newBackupInfo(s)
1681			if err != nil {
1682				return "", fmt.Errorf("failed to parse backup proto %v", err)
1683			}
1684			it.items = append(it.items, backupInfo)
1685		}
1686		return resp.NextPageToken, nil
1687	}
1688	bufLen := func() int { return len(it.items) }
1689	takeBuf := func() interface{} { b := it.items; it.items = nil; return b }
1690
1691	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, bufLen, takeBuf)
1692
1693	return it
1694}
1695
1696// newBackupInfo creates a BackupInfo struct from a btapb.Backup protocol buffer.
1697func newBackupInfo(backup *btapb.Backup) (*BackupInfo, error) {
1698	nameParts := strings.Split(backup.Name, "/")
1699	name := nameParts[len(nameParts)-1]
1700	tablePathParts := strings.Split(backup.SourceTable, "/")
1701	tableID := tablePathParts[len(tablePathParts)-1]
1702
1703	startTime, err := ptypes.Timestamp(backup.StartTime)
1704	if err != nil {
1705		return nil, fmt.Errorf("invalid startTime: %v", err)
1706	}
1707
1708	endTime, err := ptypes.Timestamp(backup.EndTime)
1709	if err != nil {
1710		return nil, fmt.Errorf("invalid endTime: %v", err)
1711	}
1712
1713	expireTime, err := ptypes.Timestamp(backup.ExpireTime)
1714	if err != nil {
1715		return nil, fmt.Errorf("invalid expireTime: %v", err)
1716	}
1717	encryptionInfo := newEncryptionInfo(backup.EncryptionInfo)
1718	bi := BackupInfo{
1719		Name:           name,
1720		SourceTable:    tableID,
1721		SizeBytes:      backup.SizeBytes,
1722		StartTime:      startTime,
1723		EndTime:        endTime,
1724		ExpireTime:     expireTime,
1725		State:          backup.State.String(),
1726		EncryptionInfo: encryptionInfo,
1727	}
1728
1729	return &bi, nil
1730}
1731
1732// BackupIterator is an EntryIterator that iterates over log entries.
1733type BackupIterator struct {
1734	items    []*BackupInfo
1735	pageInfo *iterator.PageInfo
1736	nextFunc func() error
1737}
1738
1739// PageInfo supports pagination. See https://godoc.org/google.golang.org/api/iterator package for details.
1740func (it *BackupIterator) PageInfo() *iterator.PageInfo {
1741	return it.pageInfo
1742}
1743
1744// Next returns the next result. Its second return value is iterator.Done
1745// (https://godoc.org/google.golang.org/api/iterator) if there are no more
1746// results. Once Next returns Done, all subsequent calls will return Done.
1747func (it *BackupIterator) Next() (*BackupInfo, error) {
1748	if err := it.nextFunc(); err != nil {
1749		return nil, err
1750	}
1751	item := it.items[0]
1752	it.items = it.items[1:]
1753	return item, nil
1754}
1755
1756// BackupInfo contains backup metadata. This struct is read-only.
1757type BackupInfo struct {
1758	Name           string
1759	SourceTable    string
1760	SizeBytes      int64
1761	StartTime      time.Time
1762	EndTime        time.Time
1763	ExpireTime     time.Time
1764	State          string
1765	EncryptionInfo *EncryptionInfo
1766}
1767
1768// BackupInfo gets backup metadata.
1769func (ac *AdminClient) BackupInfo(ctx context.Context, cluster, backup string) (*BackupInfo, error) {
1770	ctx = mergeOutgoingMetadata(ctx, ac.md)
1771	backupPath := ac.backupPath(cluster, ac.instance, backup)
1772
1773	req := &btapb.GetBackupRequest{
1774		Name: backupPath,
1775	}
1776
1777	var resp *btapb.Backup
1778	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
1779		var err error
1780		resp, err = ac.tClient.GetBackup(ctx, req)
1781		return err
1782	}, retryOptions...)
1783	if err != nil {
1784		return nil, err
1785	}
1786
1787	return newBackupInfo(resp)
1788}
1789
1790// DeleteBackup deletes a backup in a cluster.
1791func (ac *AdminClient) DeleteBackup(ctx context.Context, cluster, backup string) error {
1792	ctx = mergeOutgoingMetadata(ctx, ac.md)
1793	backupPath := ac.backupPath(cluster, ac.instance, backup)
1794
1795	req := &btapb.DeleteBackupRequest{
1796		Name: backupPath,
1797	}
1798	_, err := ac.tClient.DeleteBackup(ctx, req)
1799	return err
1800}
1801
1802// UpdateBackup updates the backup metadata in a cluster. The API only supports updating expire time.
1803func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string, expireTime time.Time) error {
1804	ctx = mergeOutgoingMetadata(ctx, ac.md)
1805	backupPath := ac.backupPath(cluster, ac.instance, backup)
1806
1807	expireTimestamp, err := ptypes.TimestampProto(expireTime)
1808	if err != nil {
1809		return err
1810	}
1811
1812	updateMask := &field_mask.FieldMask{}
1813	updateMask.Paths = append(updateMask.Paths, "expire_time")
1814
1815	req := &btapb.UpdateBackupRequest{
1816		Backup: &btapb.Backup{
1817			Name:       backupPath,
1818			ExpireTime: expireTimestamp,
1819		},
1820		UpdateMask: updateMask,
1821	}
1822	_, err = ac.tClient.UpdateBackup(ctx, req)
1823	return err
1824}
1825