1/* 2Copyright 2018 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "context" 22 "encoding/gob" 23 "log" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 sppb "google.golang.org/genproto/googleapis/spanner/v1" 28) 29 30// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting 31// arbitrarily large amounts of data from Cloud Spanner databases. 32// BatchReadOnlyTransaction partitions a read/query request. Read/query request 33// can then be executed independently over each partition while observing the 34// same snapshot of the database. BatchReadOnlyTransaction can also be shared 35// across multiple clients by passing around the BatchReadOnlyTransactionID and 36// then recreating the transaction using Client.BatchReadOnlyTransactionFromID. 37// 38// Note: if a client is used only to run partitions, you can 39// create it using a ClientConfig with both MinOpened and MaxIdle set to 40// zero to avoid creating unnecessary sessions. You can also avoid excess 41// gRPC channels by setting ClientConfig.NumChannels to the number of 42// concurrently active BatchReadOnlyTransactions you expect to have. 43type BatchReadOnlyTransaction struct { 44 ReadOnlyTransaction 45 ID BatchReadOnlyTransactionID 46} 47 48// BatchReadOnlyTransactionID is a unique identifier for a 49// BatchReadOnlyTransaction. It can be used to re-create a 50// BatchReadOnlyTransaction on a different machine or process by calling 51// Client.BatchReadOnlyTransactionFromID. 52type BatchReadOnlyTransactionID struct { 53 // unique ID for the transaction. 54 tid transactionID 55 // sid is the id of the Cloud Spanner session used for this transaction. 56 sid string 57 // rts is the read timestamp of this transaction. 58 rts time.Time 59} 60 61// Partition defines a segment of data to be read in a batch read or query. A 62// partition can be serialized and processed across several different machines 63// or processes. 64type Partition struct { 65 pt []byte 66 qreq *sppb.ExecuteSqlRequest 67 rreq *sppb.ReadRequest 68} 69 70// PartitionOptions specifies options for a PartitionQueryRequest and 71// PartitionReadRequest. See 72// https://godoc.org/google.golang.org/genproto/googleapis/spanner/v1#PartitionOptions 73// for more details. 74type PartitionOptions struct { 75 // The desired data size for each partition generated. 76 PartitionBytes int64 77 // The desired maximum number of partitions to return. 78 MaxPartitions int64 79} 80 81// toProto converts a spanner.PartitionOptions into a sppb.PartitionOptions 82func (opt PartitionOptions) toProto() *sppb.PartitionOptions { 83 return &sppb.PartitionOptions{ 84 PartitionSizeBytes: opt.PartitionBytes, 85 MaxPartitions: opt.MaxPartitions, 86 } 87} 88 89// PartitionRead returns a list of Partitions that can be used to read rows from 90// the database. These partitions can be executed across multiple processes, 91// even across different machines. The partition size and count hints can be 92// configured using PartitionOptions. 93func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { 94 return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt) 95} 96 97// PartitionReadUsingIndex returns a list of Partitions that can be used to read 98// rows from the database using an index. 99func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { 100 sh, ts, err := t.acquire(ctx) 101 if err != nil { 102 return nil, err 103 } 104 sid, client := sh.getID(), sh.getClient() 105 var ( 106 kset *sppb.KeySet 107 resp *sppb.PartitionResponse 108 partitions []*Partition 109 ) 110 kset, err = keys.keySetProto() 111 // Request partitions. 112 if err != nil { 113 return nil, err 114 } 115 resp, err = client.PartitionRead(ctx, &sppb.PartitionReadRequest{ 116 Session: sid, 117 Transaction: ts, 118 Table: table, 119 Index: index, 120 Columns: columns, 121 KeySet: kset, 122 PartitionOptions: opt.toProto(), 123 }) 124 // Prepare ReadRequest. 125 req := &sppb.ReadRequest{ 126 Session: sid, 127 Transaction: ts, 128 Table: table, 129 Index: index, 130 Columns: columns, 131 KeySet: kset, 132 } 133 // Generate partitions. 134 for _, p := range resp.GetPartitions() { 135 partitions = append(partitions, &Partition{ 136 pt: p.PartitionToken, 137 rreq: req, 138 }) 139 } 140 return partitions, err 141} 142 143// PartitionQuery returns a list of Partitions that can be used to execute a 144// query against the database. 145func (t *BatchReadOnlyTransaction) PartitionQuery(ctx context.Context, statement Statement, opt PartitionOptions) ([]*Partition, error) { 146 sh, ts, err := t.acquire(ctx) 147 if err != nil { 148 return nil, err 149 } 150 sid, client := sh.getID(), sh.getClient() 151 params, paramTypes, err := statement.convertParams() 152 if err != nil { 153 return nil, err 154 } 155 156 // request Partitions 157 req := &sppb.PartitionQueryRequest{ 158 Session: sid, 159 Transaction: ts, 160 Sql: statement.SQL, 161 PartitionOptions: opt.toProto(), 162 Params: params, 163 ParamTypes: paramTypes, 164 } 165 resp, err := client.PartitionQuery(ctx, req) 166 167 // prepare ExecuteSqlRequest 168 r := &sppb.ExecuteSqlRequest{ 169 Session: sid, 170 Transaction: ts, 171 Sql: statement.SQL, 172 Params: params, 173 ParamTypes: paramTypes, 174 } 175 176 // generate Partitions 177 var partitions []*Partition 178 for _, p := range resp.GetPartitions() { 179 partitions = append(partitions, &Partition{ 180 pt: p.PartitionToken, 181 qreq: r, 182 }) 183 } 184 return partitions, err 185} 186 187// release implements txReadEnv.release, noop. 188func (t *BatchReadOnlyTransaction) release(err error) { 189} 190 191// setTimestamp implements txReadEnv.setTimestamp, noop. 192// 193// read timestamp is ready on txn initialization, avoid contending writing to it 194// with future partitions. 195func (t *BatchReadOnlyTransaction) setTimestamp(ts time.Time) { 196} 197 198// Close marks the txn as closed. 199func (t *BatchReadOnlyTransaction) Close() { 200 t.mu.Lock() 201 defer t.mu.Unlock() 202 t.state = txClosed 203} 204 205// Cleanup cleans up all the resources used by this transaction and makes 206// it unusable. Once this method is invoked, the transaction is no longer 207// usable anywhere, including other clients/processes with which this 208// transaction was shared. 209// 210// Calling Cleanup is optional, but recommended. If Cleanup is not called, the 211// transaction's resources will be freed when the session expires on the backend 212// and is deleted. For more information about recycled sessions, see 213// https://cloud.google.com/spanner/docs/sessions. 214func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) { 215 t.Close() 216 t.mu.Lock() 217 defer t.mu.Unlock() 218 sh := t.sh 219 if sh == nil { 220 return 221 } 222 t.sh = nil 223 sid, client := sh.getID(), sh.getClient() 224 err := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid}) 225 if err != nil { 226 var logger *log.Logger 227 if sh.session != nil { 228 logger = sh.session.logger 229 } 230 logf(logger, "Failed to delete session %v. Error: %v", sid, err) 231 } 232} 233 234// Execute runs a single Partition obtained from PartitionRead or 235// PartitionQuery. 236func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *RowIterator { 237 var ( 238 sh *sessionHandle 239 err error 240 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) 241 ) 242 if sh, _, err = t.acquire(ctx); err != nil { 243 return &RowIterator{err: err} 244 } 245 client := sh.getClient() 246 if client == nil { 247 // Might happen if transaction is closed in the middle of a API call. 248 return &RowIterator{err: errSessionClosed(sh)} 249 } 250 // Read or query partition. 251 if p.rreq != nil { 252 p.rreq.PartitionToken = p.pt 253 rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 254 p.rreq.ResumeToken = resumeToken 255 return client.StreamingRead(ctx, p.rreq) 256 } 257 } else { 258 p.qreq.PartitionToken = p.pt 259 rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 260 p.qreq.ResumeToken = resumeToken 261 return client.ExecuteStreamingSql(ctx, p.qreq) 262 } 263 } 264 return stream( 265 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 266 sh.session.logger, 267 rpc, 268 t.setTimestamp, 269 t.release) 270} 271 272// MarshalBinary implements BinaryMarshaler. 273func (tid BatchReadOnlyTransactionID) MarshalBinary() (data []byte, err error) { 274 var buf bytes.Buffer 275 enc := gob.NewEncoder(&buf) 276 if err := enc.Encode(tid.tid); err != nil { 277 return nil, err 278 } 279 if err := enc.Encode(tid.sid); err != nil { 280 return nil, err 281 } 282 if err := enc.Encode(tid.rts); err != nil { 283 return nil, err 284 } 285 return buf.Bytes(), nil 286} 287 288// UnmarshalBinary implements BinaryUnmarshaler. 289func (tid *BatchReadOnlyTransactionID) UnmarshalBinary(data []byte) error { 290 dec := gob.NewDecoder(bytes.NewReader(data)) 291 if err := dec.Decode(&tid.tid); err != nil { 292 return err 293 } 294 if err := dec.Decode(&tid.sid); err != nil { 295 return err 296 } 297 return dec.Decode(&tid.rts) 298} 299 300// MarshalBinary implements BinaryMarshaler. 301func (p Partition) MarshalBinary() (data []byte, err error) { 302 var buf bytes.Buffer 303 enc := gob.NewEncoder(&buf) 304 if err := enc.Encode(p.pt); err != nil { 305 return nil, err 306 } 307 var isReadPartition bool 308 var req proto.Message 309 if p.rreq != nil { 310 isReadPartition = true 311 req = p.rreq 312 } else { 313 isReadPartition = false 314 req = p.qreq 315 } 316 if err := enc.Encode(isReadPartition); err != nil { 317 return nil, err 318 } 319 if data, err = proto.Marshal(req); err != nil { 320 return nil, err 321 } 322 if err := enc.Encode(data); err != nil { 323 return nil, err 324 } 325 return buf.Bytes(), nil 326} 327 328// UnmarshalBinary implements BinaryUnmarshaler. 329func (p *Partition) UnmarshalBinary(data []byte) error { 330 var ( 331 isReadPartition bool 332 d []byte 333 err error 334 ) 335 dec := gob.NewDecoder(bytes.NewReader(data)) 336 if err := dec.Decode(&p.pt); err != nil { 337 return err 338 } 339 if err := dec.Decode(&isReadPartition); err != nil { 340 return err 341 } 342 if err := dec.Decode(&d); err != nil { 343 return err 344 } 345 if isReadPartition { 346 p.rreq = &sppb.ReadRequest{} 347 err = proto.Unmarshal(d, p.rreq) 348 } else { 349 p.qreq = &sppb.ExecuteSqlRequest{} 350 err = proto.Unmarshal(d, p.qreq) 351 } 352 return err 353} 354