1/*
2Copyright 2018 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"context"
22	"encoding/gob"
23	"log"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	sppb "google.golang.org/genproto/googleapis/spanner/v1"
28)
29
30// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
31// arbitrarily large amounts of data from Cloud Spanner databases.
32// BatchReadOnlyTransaction partitions a read/query request. Read/query request
33// can then be executed independently over each partition while observing the
34// same snapshot of the database. BatchReadOnlyTransaction can also be shared
35// across multiple clients by passing around the BatchReadOnlyTransactionID and
36// then recreating the transaction using Client.BatchReadOnlyTransactionFromID.
37//
38// Note: if a client is used only to run partitions, you can
39// create it using a ClientConfig with both MinOpened and MaxIdle set to
40// zero to avoid creating unnecessary sessions. You can also avoid excess
41// gRPC channels by setting ClientConfig.NumChannels to the number of
42// concurrently active BatchReadOnlyTransactions you expect to have.
43type BatchReadOnlyTransaction struct {
44	ReadOnlyTransaction
45	ID BatchReadOnlyTransactionID
46}
47
48// BatchReadOnlyTransactionID is a unique identifier for a
49// BatchReadOnlyTransaction. It can be used to re-create a
50// BatchReadOnlyTransaction on a different machine or process by calling
51// Client.BatchReadOnlyTransactionFromID.
52type BatchReadOnlyTransactionID struct {
53	// unique ID for the transaction.
54	tid transactionID
55	// sid is the id of the Cloud Spanner session used for this transaction.
56	sid string
57	// rts is the read timestamp of this transaction.
58	rts time.Time
59}
60
61// Partition defines a segment of data to be read in a batch read or query. A
62// partition can be serialized and processed across several different machines
63// or processes.
64type Partition struct {
65	pt   []byte
66	qreq *sppb.ExecuteSqlRequest
67	rreq *sppb.ReadRequest
68}
69
70// PartitionOptions specifies options for a PartitionQueryRequest and
71// PartitionReadRequest. See
72// https://godoc.org/google.golang.org/genproto/googleapis/spanner/v1#PartitionOptions
73// for more details.
74type PartitionOptions struct {
75	// The desired data size for each partition generated.
76	PartitionBytes int64
77	// The desired maximum number of partitions to return.
78	MaxPartitions int64
79}
80
81// toProto converts a spanner.PartitionOptions into a sppb.PartitionOptions
82func (opt PartitionOptions) toProto() *sppb.PartitionOptions {
83	return &sppb.PartitionOptions{
84		PartitionSizeBytes: opt.PartitionBytes,
85		MaxPartitions:      opt.MaxPartitions,
86	}
87}
88
89// PartitionRead returns a list of Partitions that can be used to read rows from
90// the database. These partitions can be executed across multiple processes,
91// even across different machines. The partition size and count hints can be
92// configured using PartitionOptions.
93func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
94	return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt)
95}
96
97// PartitionReadUsingIndex returns a list of Partitions that can be used to read
98// rows from the database using an index.
99func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
100	sh, ts, err := t.acquire(ctx)
101	if err != nil {
102		return nil, err
103	}
104	sid, client := sh.getID(), sh.getClient()
105	var (
106		kset       *sppb.KeySet
107		resp       *sppb.PartitionResponse
108		partitions []*Partition
109	)
110	kset, err = keys.keySetProto()
111	// Request partitions.
112	if err != nil {
113		return nil, err
114	}
115	resp, err = client.PartitionRead(ctx, &sppb.PartitionReadRequest{
116		Session:          sid,
117		Transaction:      ts,
118		Table:            table,
119		Index:            index,
120		Columns:          columns,
121		KeySet:           kset,
122		PartitionOptions: opt.toProto(),
123	})
124	// Prepare ReadRequest.
125	req := &sppb.ReadRequest{
126		Session:     sid,
127		Transaction: ts,
128		Table:       table,
129		Index:       index,
130		Columns:     columns,
131		KeySet:      kset,
132	}
133	// Generate partitions.
134	for _, p := range resp.GetPartitions() {
135		partitions = append(partitions, &Partition{
136			pt:   p.PartitionToken,
137			rreq: req,
138		})
139	}
140	return partitions, err
141}
142
143// PartitionQuery returns a list of Partitions that can be used to execute a
144// query against the database.
145func (t *BatchReadOnlyTransaction) PartitionQuery(ctx context.Context, statement Statement, opt PartitionOptions) ([]*Partition, error) {
146	return t.partitionQuery(ctx, statement, opt, t.ReadOnlyTransaction.txReadOnly.qo)
147}
148
149// PartitionQueryWithOptions returns a list of Partitions that can be used to
150// execute a query against the database. The sql query execution will be
151// optimized based on the given query options.
152func (t *BatchReadOnlyTransaction) PartitionQueryWithOptions(ctx context.Context, statement Statement, opt PartitionOptions, qOpts QueryOptions) ([]*Partition, error) {
153	return t.partitionQuery(ctx, statement, opt, t.ReadOnlyTransaction.txReadOnly.qo.merge(qOpts))
154}
155
156func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement Statement, opt PartitionOptions, qOpts QueryOptions) ([]*Partition, error) {
157	sh, ts, err := t.acquire(ctx)
158	if err != nil {
159		return nil, err
160	}
161	sid, client := sh.getID(), sh.getClient()
162	params, paramTypes, err := statement.convertParams()
163	if err != nil {
164		return nil, err
165	}
166
167	// request Partitions
168	req := &sppb.PartitionQueryRequest{
169		Session:          sid,
170		Transaction:      ts,
171		Sql:              statement.SQL,
172		PartitionOptions: opt.toProto(),
173		Params:           params,
174		ParamTypes:       paramTypes,
175	}
176	resp, err := client.PartitionQuery(ctx, req)
177
178	// prepare ExecuteSqlRequest
179	r := &sppb.ExecuteSqlRequest{
180		Session:      sid,
181		Transaction:  ts,
182		Sql:          statement.SQL,
183		Params:       params,
184		ParamTypes:   paramTypes,
185		QueryOptions: qOpts.Options,
186	}
187
188	// generate Partitions
189	var partitions []*Partition
190	for _, p := range resp.GetPartitions() {
191		partitions = append(partitions, &Partition{
192			pt:   p.PartitionToken,
193			qreq: r,
194		})
195	}
196	return partitions, err
197}
198
199// release implements txReadEnv.release, noop.
200func (t *BatchReadOnlyTransaction) release(err error) {
201}
202
203// setTimestamp implements txReadEnv.setTimestamp, noop.
204//
205// read timestamp is ready on txn initialization, avoid contending writing to it
206// with future partitions.
207func (t *BatchReadOnlyTransaction) setTimestamp(ts time.Time) {
208}
209
210// Close marks the txn as closed.
211func (t *BatchReadOnlyTransaction) Close() {
212	t.mu.Lock()
213	defer t.mu.Unlock()
214	t.state = txClosed
215}
216
217// Cleanup cleans up all the resources used by this transaction and makes
218// it unusable. Once this method is invoked, the transaction is no longer
219// usable anywhere, including other clients/processes with which this
220// transaction was shared.
221//
222// Calling Cleanup is optional, but recommended. If Cleanup is not called, the
223// transaction's resources will be freed when the session expires on the backend
224// and is deleted. For more information about recycled sessions, see
225// https://cloud.google.com/spanner/docs/sessions.
226func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
227	t.Close()
228	t.mu.Lock()
229	defer t.mu.Unlock()
230	sh := t.sh
231	if sh == nil {
232		return
233	}
234	t.sh = nil
235	sid, client := sh.getID(), sh.getClient()
236	err := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid})
237	if err != nil {
238		var logger *log.Logger
239		if sh.session != nil {
240			logger = sh.session.logger
241		}
242		logf(logger, "Failed to delete session %v. Error: %v", sid, err)
243	}
244}
245
246// Execute runs a single Partition obtained from PartitionRead or
247// PartitionQuery.
248func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *RowIterator {
249	var (
250		sh  *sessionHandle
251		err error
252		rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error)
253	)
254	if sh, _, err = t.acquire(ctx); err != nil {
255		return &RowIterator{err: err}
256	}
257	client := sh.getClient()
258	if client == nil {
259		// Might happen if transaction is closed in the middle of a API call.
260		return &RowIterator{err: errSessionClosed(sh)}
261	}
262	// Read or query partition.
263	if p.rreq != nil {
264		p.rreq.PartitionToken = p.pt
265		rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
266			p.rreq.ResumeToken = resumeToken
267			return client.StreamingRead(ctx, p.rreq)
268		}
269	} else {
270		p.qreq.PartitionToken = p.pt
271		rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
272			p.qreq.ResumeToken = resumeToken
273			return client.ExecuteStreamingSql(ctx, p.qreq)
274		}
275	}
276	return stream(
277		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
278		sh.session.logger,
279		rpc,
280		t.setTimestamp,
281		t.release)
282}
283
284// MarshalBinary implements BinaryMarshaler.
285func (tid BatchReadOnlyTransactionID) MarshalBinary() (data []byte, err error) {
286	var buf bytes.Buffer
287	enc := gob.NewEncoder(&buf)
288	if err := enc.Encode(tid.tid); err != nil {
289		return nil, err
290	}
291	if err := enc.Encode(tid.sid); err != nil {
292		return nil, err
293	}
294	if err := enc.Encode(tid.rts); err != nil {
295		return nil, err
296	}
297	return buf.Bytes(), nil
298}
299
300// UnmarshalBinary implements BinaryUnmarshaler.
301func (tid *BatchReadOnlyTransactionID) UnmarshalBinary(data []byte) error {
302	dec := gob.NewDecoder(bytes.NewReader(data))
303	if err := dec.Decode(&tid.tid); err != nil {
304		return err
305	}
306	if err := dec.Decode(&tid.sid); err != nil {
307		return err
308	}
309	return dec.Decode(&tid.rts)
310}
311
312// MarshalBinary implements BinaryMarshaler.
313func (p Partition) MarshalBinary() (data []byte, err error) {
314	var buf bytes.Buffer
315	enc := gob.NewEncoder(&buf)
316	if err := enc.Encode(p.pt); err != nil {
317		return nil, err
318	}
319	var isReadPartition bool
320	var req proto.Message
321	if p.rreq != nil {
322		isReadPartition = true
323		req = p.rreq
324	} else {
325		isReadPartition = false
326		req = p.qreq
327	}
328	if err := enc.Encode(isReadPartition); err != nil {
329		return nil, err
330	}
331	if data, err = proto.Marshal(req); err != nil {
332		return nil, err
333	}
334	if err := enc.Encode(data); err != nil {
335		return nil, err
336	}
337	return buf.Bytes(), nil
338}
339
340// UnmarshalBinary implements BinaryUnmarshaler.
341func (p *Partition) UnmarshalBinary(data []byte) error {
342	var (
343		isReadPartition bool
344		d               []byte
345		err             error
346	)
347	dec := gob.NewDecoder(bytes.NewReader(data))
348	if err := dec.Decode(&p.pt); err != nil {
349		return err
350	}
351	if err := dec.Decode(&isReadPartition); err != nil {
352		return err
353	}
354	if err := dec.Decode(&d); err != nil {
355		return err
356	}
357	if isReadPartition {
358		p.rreq = &sppb.ReadRequest{}
359		err = proto.Unmarshal(d, p.rreq)
360	} else {
361		p.qreq = &sppb.ExecuteSqlRequest{}
362		err = proto.Unmarshal(d, p.qreq)
363	}
364	return err
365}
366