1/* 2Copyright 2018 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "context" 22 "encoding/gob" 23 "log" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 sppb "google.golang.org/genproto/googleapis/spanner/v1" 28) 29 30// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting 31// arbitrarily large amounts of data from Cloud Spanner databases. 32// BatchReadOnlyTransaction partitions a read/query request. Read/query request 33// can then be executed independently over each partition while observing the 34// same snapshot of the database. BatchReadOnlyTransaction can also be shared 35// across multiple clients by passing around the BatchReadOnlyTransactionID and 36// then recreating the transaction using Client.BatchReadOnlyTransactionFromID. 37// 38// Note: if a client is used only to run partitions, you can 39// create it using a ClientConfig with both MinOpened and MaxIdle set to 40// zero to avoid creating unnecessary sessions. You can also avoid excess 41// gRPC channels by setting ClientConfig.NumChannels to the number of 42// concurrently active BatchReadOnlyTransactions you expect to have. 43type BatchReadOnlyTransaction struct { 44 ReadOnlyTransaction 45 ID BatchReadOnlyTransactionID 46} 47 48// BatchReadOnlyTransactionID is a unique identifier for a 49// BatchReadOnlyTransaction. It can be used to re-create a 50// BatchReadOnlyTransaction on a different machine or process by calling 51// Client.BatchReadOnlyTransactionFromID. 52type BatchReadOnlyTransactionID struct { 53 // unique ID for the transaction. 54 tid transactionID 55 // sid is the id of the Cloud Spanner session used for this transaction. 56 sid string 57 // rts is the read timestamp of this transaction. 58 rts time.Time 59} 60 61// Partition defines a segment of data to be read in a batch read or query. A 62// partition can be serialized and processed across several different machines 63// or processes. 64type Partition struct { 65 pt []byte 66 qreq *sppb.ExecuteSqlRequest 67 rreq *sppb.ReadRequest 68} 69 70// PartitionOptions specifies options for a PartitionQueryRequest and 71// PartitionReadRequest. See 72// https://godoc.org/google.golang.org/genproto/googleapis/spanner/v1#PartitionOptions 73// for more details. 74type PartitionOptions struct { 75 // The desired data size for each partition generated. 76 PartitionBytes int64 77 // The desired maximum number of partitions to return. 78 MaxPartitions int64 79} 80 81// toProto converts a spanner.PartitionOptions into a sppb.PartitionOptions 82func (opt PartitionOptions) toProto() *sppb.PartitionOptions { 83 return &sppb.PartitionOptions{ 84 PartitionSizeBytes: opt.PartitionBytes, 85 MaxPartitions: opt.MaxPartitions, 86 } 87} 88 89// PartitionRead returns a list of Partitions that can be used to read rows from 90// the database. These partitions can be executed across multiple processes, 91// even across different machines. The partition size and count hints can be 92// configured using PartitionOptions. 93func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { 94 return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt) 95} 96 97// PartitionReadUsingIndex returns a list of Partitions that can be used to read 98// rows from the database using an index. 99func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { 100 sh, ts, err := t.acquire(ctx) 101 if err != nil { 102 return nil, err 103 } 104 sid, client := sh.getID(), sh.getClient() 105 var ( 106 kset *sppb.KeySet 107 resp *sppb.PartitionResponse 108 partitions []*Partition 109 ) 110 kset, err = keys.keySetProto() 111 // Request partitions. 112 if err != nil { 113 return nil, err 114 } 115 resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{ 116 Session: sid, 117 Transaction: ts, 118 Table: table, 119 Index: index, 120 Columns: columns, 121 KeySet: kset, 122 PartitionOptions: opt.toProto(), 123 }) 124 // Prepare ReadRequest. 125 req := &sppb.ReadRequest{ 126 Session: sid, 127 Transaction: ts, 128 Table: table, 129 Index: index, 130 Columns: columns, 131 KeySet: kset, 132 } 133 // Generate partitions. 134 for _, p := range resp.GetPartitions() { 135 partitions = append(partitions, &Partition{ 136 pt: p.PartitionToken, 137 rreq: req, 138 }) 139 } 140 return partitions, err 141} 142 143// PartitionQuery returns a list of Partitions that can be used to execute a 144// query against the database. 145func (t *BatchReadOnlyTransaction) PartitionQuery(ctx context.Context, statement Statement, opt PartitionOptions) ([]*Partition, error) { 146 return t.partitionQuery(ctx, statement, opt, t.ReadOnlyTransaction.txReadOnly.qo) 147} 148 149// PartitionQueryWithOptions returns a list of Partitions that can be used to 150// execute a query against the database. The sql query execution will be 151// optimized based on the given query options. 152func (t *BatchReadOnlyTransaction) PartitionQueryWithOptions(ctx context.Context, statement Statement, opt PartitionOptions, qOpts QueryOptions) ([]*Partition, error) { 153 return t.partitionQuery(ctx, statement, opt, t.ReadOnlyTransaction.txReadOnly.qo.merge(qOpts)) 154} 155 156func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement Statement, opt PartitionOptions, qOpts QueryOptions) ([]*Partition, error) { 157 sh, ts, err := t.acquire(ctx) 158 if err != nil { 159 return nil, err 160 } 161 sid, client := sh.getID(), sh.getClient() 162 params, paramTypes, err := statement.convertParams() 163 if err != nil { 164 return nil, err 165 } 166 167 // request Partitions 168 req := &sppb.PartitionQueryRequest{ 169 Session: sid, 170 Transaction: ts, 171 Sql: statement.SQL, 172 PartitionOptions: opt.toProto(), 173 Params: params, 174 ParamTypes: paramTypes, 175 } 176 resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) 177 178 // prepare ExecuteSqlRequest 179 r := &sppb.ExecuteSqlRequest{ 180 Session: sid, 181 Transaction: ts, 182 Sql: statement.SQL, 183 Params: params, 184 ParamTypes: paramTypes, 185 QueryOptions: qOpts.Options, 186 } 187 188 // generate Partitions 189 var partitions []*Partition 190 for _, p := range resp.GetPartitions() { 191 partitions = append(partitions, &Partition{ 192 pt: p.PartitionToken, 193 qreq: r, 194 }) 195 } 196 return partitions, err 197} 198 199// release implements txReadEnv.release, noop. 200func (t *BatchReadOnlyTransaction) release(err error) { 201} 202 203// setTimestamp implements txReadEnv.setTimestamp, noop. 204// 205// read timestamp is ready on txn initialization, avoid contending writing to it 206// with future partitions. 207func (t *BatchReadOnlyTransaction) setTimestamp(ts time.Time) { 208} 209 210// Close marks the txn as closed. 211func (t *BatchReadOnlyTransaction) Close() { 212 t.mu.Lock() 213 defer t.mu.Unlock() 214 t.state = txClosed 215} 216 217// Cleanup cleans up all the resources used by this transaction and makes 218// it unusable. Once this method is invoked, the transaction is no longer 219// usable anywhere, including other clients/processes with which this 220// transaction was shared. 221// 222// Calling Cleanup is optional, but recommended. If Cleanup is not called, the 223// transaction's resources will be freed when the session expires on the backend 224// and is deleted. For more information about recycled sessions, see 225// https://cloud.google.com/spanner/docs/sessions. 226func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) { 227 t.Close() 228 t.mu.Lock() 229 defer t.mu.Unlock() 230 sh := t.sh 231 if sh == nil { 232 return 233 } 234 t.sh = nil 235 sid, client := sh.getID(), sh.getClient() 236 err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}) 237 if err != nil { 238 var logger *log.Logger 239 if sh.session != nil { 240 logger = sh.session.logger 241 } 242 logf(logger, "Failed to delete session %v. Error: %v", sid, err) 243 } 244} 245 246// Execute runs a single Partition obtained from PartitionRead or 247// PartitionQuery. 248func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *RowIterator { 249 var ( 250 sh *sessionHandle 251 err error 252 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) 253 ) 254 if sh, _, err = t.acquire(ctx); err != nil { 255 return &RowIterator{err: err} 256 } 257 client := sh.getClient() 258 if client == nil { 259 // Might happen if transaction is closed in the middle of a API call. 260 return &RowIterator{err: errSessionClosed(sh)} 261 } 262 // Read or query partition. 263 if p.rreq != nil { 264 rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 265 return client.StreamingRead(ctx, &sppb.ReadRequest{ 266 Session: p.rreq.Session, 267 Transaction: p.rreq.Transaction, 268 Table: p.rreq.Table, 269 Index: p.rreq.Index, 270 Columns: p.rreq.Columns, 271 KeySet: p.rreq.KeySet, 272 PartitionToken: p.pt, 273 ResumeToken: resumeToken, 274 }) 275 } 276 } else { 277 rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 278 return client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{ 279 Session: p.qreq.Session, 280 Transaction: p.qreq.Transaction, 281 Sql: p.qreq.Sql, 282 Params: p.qreq.Params, 283 ParamTypes: p.qreq.ParamTypes, 284 QueryOptions: p.qreq.QueryOptions, 285 PartitionToken: p.pt, 286 ResumeToken: resumeToken, 287 }) 288 } 289 } 290 return stream( 291 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 292 sh.session.logger, 293 rpc, 294 t.setTimestamp, 295 t.release) 296} 297 298// MarshalBinary implements BinaryMarshaler. 299func (tid BatchReadOnlyTransactionID) MarshalBinary() (data []byte, err error) { 300 var buf bytes.Buffer 301 enc := gob.NewEncoder(&buf) 302 if err := enc.Encode(tid.tid); err != nil { 303 return nil, err 304 } 305 if err := enc.Encode(tid.sid); err != nil { 306 return nil, err 307 } 308 if err := enc.Encode(tid.rts); err != nil { 309 return nil, err 310 } 311 return buf.Bytes(), nil 312} 313 314// UnmarshalBinary implements BinaryUnmarshaler. 315func (tid *BatchReadOnlyTransactionID) UnmarshalBinary(data []byte) error { 316 dec := gob.NewDecoder(bytes.NewReader(data)) 317 if err := dec.Decode(&tid.tid); err != nil { 318 return err 319 } 320 if err := dec.Decode(&tid.sid); err != nil { 321 return err 322 } 323 return dec.Decode(&tid.rts) 324} 325 326// MarshalBinary implements BinaryMarshaler. 327func (p Partition) MarshalBinary() (data []byte, err error) { 328 var buf bytes.Buffer 329 enc := gob.NewEncoder(&buf) 330 if err := enc.Encode(p.pt); err != nil { 331 return nil, err 332 } 333 var isReadPartition bool 334 var req proto.Message 335 if p.rreq != nil { 336 isReadPartition = true 337 req = p.rreq 338 } else { 339 isReadPartition = false 340 req = p.qreq 341 } 342 if err := enc.Encode(isReadPartition); err != nil { 343 return nil, err 344 } 345 if data, err = proto.Marshal(req); err != nil { 346 return nil, err 347 } 348 if err := enc.Encode(data); err != nil { 349 return nil, err 350 } 351 return buf.Bytes(), nil 352} 353 354// UnmarshalBinary implements BinaryUnmarshaler. 355func (p *Partition) UnmarshalBinary(data []byte) error { 356 var ( 357 isReadPartition bool 358 d []byte 359 err error 360 ) 361 dec := gob.NewDecoder(bytes.NewReader(data)) 362 if err := dec.Decode(&p.pt); err != nil { 363 return err 364 } 365 if err := dec.Decode(&isReadPartition); err != nil { 366 return err 367 } 368 if err := dec.Decode(&d); err != nil { 369 return err 370 } 371 if isReadPartition { 372 p.rreq = &sppb.ReadRequest{} 373 err = proto.Unmarshal(d, p.rreq) 374 } else { 375 p.qreq = &sppb.ExecuteSqlRequest{} 376 err = proto.Unmarshal(d, p.qreq) 377 } 378 return err 379} 380