1/* 2Copyright 2018 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "context" 22 "encoding/gob" 23 "log" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 sppb "google.golang.org/genproto/googleapis/spanner/v1" 28) 29 30// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting 31// arbitrarily large amounts of data from Cloud Spanner databases. 32// BatchReadOnlyTransaction partitions a read/query request. Read/query request 33// can then be executed independently over each partition while observing the 34// same snapshot of the database. BatchReadOnlyTransaction can also be shared 35// across multiple clients by passing around the BatchReadOnlyTransactionID and 36// then recreating the transaction using Client.BatchReadOnlyTransactionFromID. 37// 38// Note: if a client is used only to run partitions, you can 39// create it using a ClientConfig with both MinOpened and MaxIdle set to 40// zero to avoid creating unnecessary sessions. You can also avoid excess 41// gRPC channels by setting ClientConfig.NumChannels to the number of 42// concurrently active BatchReadOnlyTransactions you expect to have. 43type BatchReadOnlyTransaction struct { 44 ReadOnlyTransaction 45 ID BatchReadOnlyTransactionID 46} 47 48// BatchReadOnlyTransactionID is a unique identifier for a 49// BatchReadOnlyTransaction. It can be used to re-create a 50// BatchReadOnlyTransaction on a different machine or process by calling 51// Client.BatchReadOnlyTransactionFromID. 52type BatchReadOnlyTransactionID struct { 53 // unique ID for the transaction. 54 tid transactionID 55 // sid is the id of the Cloud Spanner session used for this transaction. 56 sid string 57 // rts is the read timestamp of this transaction. 58 rts time.Time 59} 60 61// Partition defines a segment of data to be read in a batch read or query. A 62// partition can be serialized and processed across several different machines 63// or processes. 64type Partition struct { 65 pt []byte 66 qreq *sppb.ExecuteSqlRequest 67 rreq *sppb.ReadRequest 68} 69 70// PartitionOptions specifies options for a PartitionQueryRequest and 71// PartitionReadRequest. See 72// https://godoc.org/google.golang.org/genproto/googleapis/spanner/v1#PartitionOptions 73// for more details. 74type PartitionOptions struct { 75 // The desired data size for each partition generated. 76 PartitionBytes int64 77 // The desired maximum number of partitions to return. 78 MaxPartitions int64 79} 80 81// toProto converts a spanner.PartitionOptions into a sppb.PartitionOptions 82func (opt PartitionOptions) toProto() *sppb.PartitionOptions { 83 return &sppb.PartitionOptions{ 84 PartitionSizeBytes: opt.PartitionBytes, 85 MaxPartitions: opt.MaxPartitions, 86 } 87} 88 89// PartitionRead returns a list of Partitions that can be used to read rows from 90// the database. These partitions can be executed across multiple processes, 91// even across different machines. The partition size and count hints can be 92// configured using PartitionOptions. 93func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { 94 return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt) 95} 96 97// PartitionReadUsingIndex returns a list of Partitions that can be used to read 98// rows from the database using an index. 99func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { 100 sh, ts, err := t.acquire(ctx) 101 if err != nil { 102 return nil, err 103 } 104 sid, client := sh.getID(), sh.getClient() 105 var ( 106 kset *sppb.KeySet 107 resp *sppb.PartitionResponse 108 partitions []*Partition 109 ) 110 kset, err = keys.keySetProto() 111 // Request partitions. 112 if err != nil { 113 return nil, err 114 } 115 resp, err = client.PartitionRead(ctx, &sppb.PartitionReadRequest{ 116 Session: sid, 117 Transaction: ts, 118 Table: table, 119 Index: index, 120 Columns: columns, 121 KeySet: kset, 122 PartitionOptions: opt.toProto(), 123 }) 124 // Prepare ReadRequest. 125 req := &sppb.ReadRequest{ 126 Session: sid, 127 Transaction: ts, 128 Table: table, 129 Index: index, 130 Columns: columns, 131 KeySet: kset, 132 } 133 // Generate partitions. 134 for _, p := range resp.GetPartitions() { 135 partitions = append(partitions, &Partition{ 136 pt: p.PartitionToken, 137 rreq: req, 138 }) 139 } 140 return partitions, err 141} 142 143// PartitionQuery returns a list of Partitions that can be used to execute a 144// query against the database. 145func (t *BatchReadOnlyTransaction) PartitionQuery(ctx context.Context, statement Statement, opt PartitionOptions) ([]*Partition, error) { 146 return t.partitionQuery(ctx, statement, opt, t.ReadOnlyTransaction.txReadOnly.qo) 147} 148 149// PartitionQueryWithOptions returns a list of Partitions that can be used to 150// execute a query against the database. The sql query execution will be 151// optimized based on the given query options. 152func (t *BatchReadOnlyTransaction) PartitionQueryWithOptions(ctx context.Context, statement Statement, opt PartitionOptions, qOpts QueryOptions) ([]*Partition, error) { 153 return t.partitionQuery(ctx, statement, opt, t.ReadOnlyTransaction.txReadOnly.qo.merge(qOpts)) 154} 155 156func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement Statement, opt PartitionOptions, qOpts QueryOptions) ([]*Partition, error) { 157 sh, ts, err := t.acquire(ctx) 158 if err != nil { 159 return nil, err 160 } 161 sid, client := sh.getID(), sh.getClient() 162 params, paramTypes, err := statement.convertParams() 163 if err != nil { 164 return nil, err 165 } 166 167 // request Partitions 168 req := &sppb.PartitionQueryRequest{ 169 Session: sid, 170 Transaction: ts, 171 Sql: statement.SQL, 172 PartitionOptions: opt.toProto(), 173 Params: params, 174 ParamTypes: paramTypes, 175 } 176 resp, err := client.PartitionQuery(ctx, req) 177 178 // prepare ExecuteSqlRequest 179 r := &sppb.ExecuteSqlRequest{ 180 Session: sid, 181 Transaction: ts, 182 Sql: statement.SQL, 183 Params: params, 184 ParamTypes: paramTypes, 185 QueryOptions: qOpts.Options, 186 } 187 188 // generate Partitions 189 var partitions []*Partition 190 for _, p := range resp.GetPartitions() { 191 partitions = append(partitions, &Partition{ 192 pt: p.PartitionToken, 193 qreq: r, 194 }) 195 } 196 return partitions, err 197} 198 199// release implements txReadEnv.release, noop. 200func (t *BatchReadOnlyTransaction) release(err error) { 201} 202 203// setTimestamp implements txReadEnv.setTimestamp, noop. 204// 205// read timestamp is ready on txn initialization, avoid contending writing to it 206// with future partitions. 207func (t *BatchReadOnlyTransaction) setTimestamp(ts time.Time) { 208} 209 210// Close marks the txn as closed. 211func (t *BatchReadOnlyTransaction) Close() { 212 t.mu.Lock() 213 defer t.mu.Unlock() 214 t.state = txClosed 215} 216 217// Cleanup cleans up all the resources used by this transaction and makes 218// it unusable. Once this method is invoked, the transaction is no longer 219// usable anywhere, including other clients/processes with which this 220// transaction was shared. 221// 222// Calling Cleanup is optional, but recommended. If Cleanup is not called, the 223// transaction's resources will be freed when the session expires on the backend 224// and is deleted. For more information about recycled sessions, see 225// https://cloud.google.com/spanner/docs/sessions. 226func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) { 227 t.Close() 228 t.mu.Lock() 229 defer t.mu.Unlock() 230 sh := t.sh 231 if sh == nil { 232 return 233 } 234 t.sh = nil 235 sid, client := sh.getID(), sh.getClient() 236 err := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid}) 237 if err != nil { 238 var logger *log.Logger 239 if sh.session != nil { 240 logger = sh.session.logger 241 } 242 logf(logger, "Failed to delete session %v. Error: %v", sid, err) 243 } 244} 245 246// Execute runs a single Partition obtained from PartitionRead or 247// PartitionQuery. 248func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *RowIterator { 249 var ( 250 sh *sessionHandle 251 err error 252 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error) 253 ) 254 if sh, _, err = t.acquire(ctx); err != nil { 255 return &RowIterator{err: err} 256 } 257 client := sh.getClient() 258 if client == nil { 259 // Might happen if transaction is closed in the middle of a API call. 260 return &RowIterator{err: errSessionClosed(sh)} 261 } 262 // Read or query partition. 263 if p.rreq != nil { 264 p.rreq.PartitionToken = p.pt 265 rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 266 p.rreq.ResumeToken = resumeToken 267 return client.StreamingRead(ctx, p.rreq) 268 } 269 } else { 270 p.qreq.PartitionToken = p.pt 271 rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 272 p.qreq.ResumeToken = resumeToken 273 return client.ExecuteStreamingSql(ctx, p.qreq) 274 } 275 } 276 return stream( 277 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 278 sh.session.logger, 279 rpc, 280 t.setTimestamp, 281 t.release) 282} 283 284// MarshalBinary implements BinaryMarshaler. 285func (tid BatchReadOnlyTransactionID) MarshalBinary() (data []byte, err error) { 286 var buf bytes.Buffer 287 enc := gob.NewEncoder(&buf) 288 if err := enc.Encode(tid.tid); err != nil { 289 return nil, err 290 } 291 if err := enc.Encode(tid.sid); err != nil { 292 return nil, err 293 } 294 if err := enc.Encode(tid.rts); err != nil { 295 return nil, err 296 } 297 return buf.Bytes(), nil 298} 299 300// UnmarshalBinary implements BinaryUnmarshaler. 301func (tid *BatchReadOnlyTransactionID) UnmarshalBinary(data []byte) error { 302 dec := gob.NewDecoder(bytes.NewReader(data)) 303 if err := dec.Decode(&tid.tid); err != nil { 304 return err 305 } 306 if err := dec.Decode(&tid.sid); err != nil { 307 return err 308 } 309 return dec.Decode(&tid.rts) 310} 311 312// MarshalBinary implements BinaryMarshaler. 313func (p Partition) MarshalBinary() (data []byte, err error) { 314 var buf bytes.Buffer 315 enc := gob.NewEncoder(&buf) 316 if err := enc.Encode(p.pt); err != nil { 317 return nil, err 318 } 319 var isReadPartition bool 320 var req proto.Message 321 if p.rreq != nil { 322 isReadPartition = true 323 req = p.rreq 324 } else { 325 isReadPartition = false 326 req = p.qreq 327 } 328 if err := enc.Encode(isReadPartition); err != nil { 329 return nil, err 330 } 331 if data, err = proto.Marshal(req); err != nil { 332 return nil, err 333 } 334 if err := enc.Encode(data); err != nil { 335 return nil, err 336 } 337 return buf.Bytes(), nil 338} 339 340// UnmarshalBinary implements BinaryUnmarshaler. 341func (p *Partition) UnmarshalBinary(data []byte) error { 342 var ( 343 isReadPartition bool 344 d []byte 345 err error 346 ) 347 dec := gob.NewDecoder(bytes.NewReader(data)) 348 if err := dec.Decode(&p.pt); err != nil { 349 return err 350 } 351 if err := dec.Decode(&isReadPartition); err != nil { 352 return err 353 } 354 if err := dec.Decode(&d); err != nil { 355 return err 356 } 357 if isReadPartition { 358 p.rreq = &sppb.ReadRequest{} 359 err = proto.Unmarshal(d, p.rreq) 360 } else { 361 p.qreq = &sppb.ExecuteSqlRequest{} 362 err = proto.Unmarshal(d, p.qreq) 363 } 364 return err 365} 366