1/*
2Copyright 2018 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"context"
22	"encoding/gob"
23	"log"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	sppb "google.golang.org/genproto/googleapis/spanner/v1"
28)
29
30// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
31// arbitrarily large amounts of data from Cloud Spanner databases.
32// BatchReadOnlyTransaction partitions a read/query request. Read/query request
33// can then be executed independently over each partition while observing the
34// same snapshot of the database. BatchReadOnlyTransaction can also be shared
35// across multiple clients by passing around the BatchReadOnlyTransactionID and
36// then recreating the transaction using Client.BatchReadOnlyTransactionFromID.
37//
38// Note: if a client is used only to run partitions, you can
39// create it using a ClientConfig with both MinOpened and MaxIdle set to
40// zero to avoid creating unnecessary sessions. You can also avoid excess
41// gRPC channels by setting ClientConfig.NumChannels to the number of
42// concurrently active BatchReadOnlyTransactions you expect to have.
43type BatchReadOnlyTransaction struct {
44	ReadOnlyTransaction
45	ID BatchReadOnlyTransactionID
46}
47
48// BatchReadOnlyTransactionID is a unique identifier for a
49// BatchReadOnlyTransaction. It can be used to re-create a
50// BatchReadOnlyTransaction on a different machine or process by calling
51// Client.BatchReadOnlyTransactionFromID.
52type BatchReadOnlyTransactionID struct {
53	// unique ID for the transaction.
54	tid transactionID
55	// sid is the id of the Cloud Spanner session used for this transaction.
56	sid string
57	// rts is the read timestamp of this transaction.
58	rts time.Time
59}
60
61// Partition defines a segment of data to be read in a batch read or query. A
62// partition can be serialized and processed across several different machines
63// or processes.
64type Partition struct {
65	pt   []byte
66	qreq *sppb.ExecuteSqlRequest
67	rreq *sppb.ReadRequest
68}
69
70// PartitionOptions specifies options for a PartitionQueryRequest and
71// PartitionReadRequest. See
72// https://godoc.org/google.golang.org/genproto/googleapis/spanner/v1#PartitionOptions
73// for more details.
74type PartitionOptions struct {
75	// The desired data size for each partition generated.
76	PartitionBytes int64
77	// The desired maximum number of partitions to return.
78	MaxPartitions int64
79}
80
81// toProto converts a spanner.PartitionOptions into a sppb.PartitionOptions
82func (opt PartitionOptions) toProto() *sppb.PartitionOptions {
83	return &sppb.PartitionOptions{
84		PartitionSizeBytes: opt.PartitionBytes,
85		MaxPartitions:      opt.MaxPartitions,
86	}
87}
88
89// PartitionRead returns a list of Partitions that can be used to read rows from
90// the database. These partitions can be executed across multiple processes,
91// even across different machines. The partition size and count hints can be
92// configured using PartitionOptions.
93func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
94	return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt)
95}
96
97// PartitionReadUsingIndex returns a list of Partitions that can be used to read
98// rows from the database using an index.
99func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
100	sh, ts, err := t.acquire(ctx)
101	if err != nil {
102		return nil, err
103	}
104	sid, client := sh.getID(), sh.getClient()
105	var (
106		kset       *sppb.KeySet
107		resp       *sppb.PartitionResponse
108		partitions []*Partition
109	)
110	kset, err = keys.keySetProto()
111	// Request partitions.
112	if err != nil {
113		return nil, err
114	}
115	resp, err = client.PartitionRead(ctx, &sppb.PartitionReadRequest{
116		Session:          sid,
117		Transaction:      ts,
118		Table:            table,
119		Index:            index,
120		Columns:          columns,
121		KeySet:           kset,
122		PartitionOptions: opt.toProto(),
123	})
124	// Prepare ReadRequest.
125	req := &sppb.ReadRequest{
126		Session:     sid,
127		Transaction: ts,
128		Table:       table,
129		Index:       index,
130		Columns:     columns,
131		KeySet:      kset,
132	}
133	// Generate partitions.
134	for _, p := range resp.GetPartitions() {
135		partitions = append(partitions, &Partition{
136			pt:   p.PartitionToken,
137			rreq: req,
138		})
139	}
140	return partitions, err
141}
142
143// PartitionQuery returns a list of Partitions that can be used to execute a
144// query against the database.
145func (t *BatchReadOnlyTransaction) PartitionQuery(ctx context.Context, statement Statement, opt PartitionOptions) ([]*Partition, error) {
146	sh, ts, err := t.acquire(ctx)
147	if err != nil {
148		return nil, err
149	}
150	sid, client := sh.getID(), sh.getClient()
151	params, paramTypes, err := statement.convertParams()
152	if err != nil {
153		return nil, err
154	}
155
156	// request Partitions
157	req := &sppb.PartitionQueryRequest{
158		Session:          sid,
159		Transaction:      ts,
160		Sql:              statement.SQL,
161		PartitionOptions: opt.toProto(),
162		Params:           params,
163		ParamTypes:       paramTypes,
164	}
165	resp, err := client.PartitionQuery(ctx, req)
166
167	// prepare ExecuteSqlRequest
168	r := &sppb.ExecuteSqlRequest{
169		Session:     sid,
170		Transaction: ts,
171		Sql:         statement.SQL,
172		Params:      params,
173		ParamTypes:  paramTypes,
174	}
175
176	// generate Partitions
177	var partitions []*Partition
178	for _, p := range resp.GetPartitions() {
179		partitions = append(partitions, &Partition{
180			pt:   p.PartitionToken,
181			qreq: r,
182		})
183	}
184	return partitions, err
185}
186
187// release implements txReadEnv.release, noop.
188func (t *BatchReadOnlyTransaction) release(err error) {
189}
190
191// setTimestamp implements txReadEnv.setTimestamp, noop.
192//
193// read timestamp is ready on txn initialization, avoid contending writing to it
194// with future partitions.
195func (t *BatchReadOnlyTransaction) setTimestamp(ts time.Time) {
196}
197
198// Close marks the txn as closed.
199func (t *BatchReadOnlyTransaction) Close() {
200	t.mu.Lock()
201	defer t.mu.Unlock()
202	t.state = txClosed
203}
204
205// Cleanup cleans up all the resources used by this transaction and makes
206// it unusable. Once this method is invoked, the transaction is no longer
207// usable anywhere, including other clients/processes with which this
208// transaction was shared.
209//
210// Calling Cleanup is optional, but recommended. If Cleanup is not called, the
211// transaction's resources will be freed when the session expires on the backend
212// and is deleted. For more information about recycled sessions, see
213// https://cloud.google.com/spanner/docs/sessions.
214func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
215	t.Close()
216	t.mu.Lock()
217	defer t.mu.Unlock()
218	sh := t.sh
219	if sh == nil {
220		return
221	}
222	t.sh = nil
223	sid, client := sh.getID(), sh.getClient()
224	err := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid})
225	if err != nil {
226		var logger *log.Logger
227		if sh.session != nil {
228			logger = sh.session.logger
229		}
230		logf(logger, "Failed to delete session %v. Error: %v", sid, err)
231	}
232}
233
234// Execute runs a single Partition obtained from PartitionRead or
235// PartitionQuery.
236func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *RowIterator {
237	var (
238		sh  *sessionHandle
239		err error
240		rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error)
241	)
242	if sh, _, err = t.acquire(ctx); err != nil {
243		return &RowIterator{err: err}
244	}
245	client := sh.getClient()
246	if client == nil {
247		// Might happen if transaction is closed in the middle of a API call.
248		return &RowIterator{err: errSessionClosed(sh)}
249	}
250	// Read or query partition.
251	if p.rreq != nil {
252		p.rreq.PartitionToken = p.pt
253		rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
254			p.rreq.ResumeToken = resumeToken
255			return client.StreamingRead(ctx, p.rreq)
256		}
257	} else {
258		p.qreq.PartitionToken = p.pt
259		rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
260			p.qreq.ResumeToken = resumeToken
261			return client.ExecuteStreamingSql(ctx, p.qreq)
262		}
263	}
264	return stream(
265		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
266		sh.session.logger,
267		rpc,
268		t.setTimestamp,
269		t.release)
270}
271
272// MarshalBinary implements BinaryMarshaler.
273func (tid BatchReadOnlyTransactionID) MarshalBinary() (data []byte, err error) {
274	var buf bytes.Buffer
275	enc := gob.NewEncoder(&buf)
276	if err := enc.Encode(tid.tid); err != nil {
277		return nil, err
278	}
279	if err := enc.Encode(tid.sid); err != nil {
280		return nil, err
281	}
282	if err := enc.Encode(tid.rts); err != nil {
283		return nil, err
284	}
285	return buf.Bytes(), nil
286}
287
288// UnmarshalBinary implements BinaryUnmarshaler.
289func (tid *BatchReadOnlyTransactionID) UnmarshalBinary(data []byte) error {
290	dec := gob.NewDecoder(bytes.NewReader(data))
291	if err := dec.Decode(&tid.tid); err != nil {
292		return err
293	}
294	if err := dec.Decode(&tid.sid); err != nil {
295		return err
296	}
297	return dec.Decode(&tid.rts)
298}
299
300// MarshalBinary implements BinaryMarshaler.
301func (p Partition) MarshalBinary() (data []byte, err error) {
302	var buf bytes.Buffer
303	enc := gob.NewEncoder(&buf)
304	if err := enc.Encode(p.pt); err != nil {
305		return nil, err
306	}
307	var isReadPartition bool
308	var req proto.Message
309	if p.rreq != nil {
310		isReadPartition = true
311		req = p.rreq
312	} else {
313		isReadPartition = false
314		req = p.qreq
315	}
316	if err := enc.Encode(isReadPartition); err != nil {
317		return nil, err
318	}
319	if data, err = proto.Marshal(req); err != nil {
320		return nil, err
321	}
322	if err := enc.Encode(data); err != nil {
323		return nil, err
324	}
325	return buf.Bytes(), nil
326}
327
328// UnmarshalBinary implements BinaryUnmarshaler.
329func (p *Partition) UnmarshalBinary(data []byte) error {
330	var (
331		isReadPartition bool
332		d               []byte
333		err             error
334	)
335	dec := gob.NewDecoder(bytes.NewReader(data))
336	if err := dec.Decode(&p.pt); err != nil {
337		return err
338	}
339	if err := dec.Decode(&isReadPartition); err != nil {
340		return err
341	}
342	if err := dec.Decode(&d); err != nil {
343		return err
344	}
345	if isReadPartition {
346		p.rreq = &sppb.ReadRequest{}
347		err = proto.Unmarshal(d, p.rreq)
348	} else {
349		p.qreq = &sppb.ExecuteSqlRequest{}
350		err = proto.Unmarshal(d, p.qreq)
351	}
352	return err
353}
354