1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "context" 22 "io" 23 "log" 24 "sync/atomic" 25 "time" 26 27 "cloud.google.com/go/internal/protostruct" 28 "cloud.google.com/go/internal/trace" 29 "github.com/golang/protobuf/proto" 30 proto3 "github.com/golang/protobuf/ptypes/struct" 31 "github.com/googleapis/gax-go/v2" 32 "google.golang.org/api/iterator" 33 sppb "google.golang.org/genproto/googleapis/spanner/v1" 34 "google.golang.org/grpc/codes" 35) 36 37// streamingReceiver is the interface for receiving data from a client side 38// stream. 39type streamingReceiver interface { 40 Recv() (*sppb.PartialResultSet, error) 41} 42 43// errEarlyReadEnd returns error for read finishes when gRPC stream is still 44// active. 45func errEarlyReadEnd() error { 46 return spannerErrorf(codes.FailedPrecondition, "read completed with active stream") 47} 48 49// stream is the internal fault tolerant method for streaming data from Cloud 50// Spanner. 51func stream( 52 ctx context.Context, 53 logger *log.Logger, 54 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), 55 setTimestamp func(time.Time), 56 release func(error), 57) *RowIterator { 58 return streamWithReplaceSessionFunc( 59 ctx, 60 logger, 61 rpc, 62 nil, 63 setTimestamp, 64 release, 65 ) 66} 67 68// this stream method will automatically retry the stream on a new session if 69// the replaceSessionFunc function has been defined. This function should only be 70// used for single-use transactions. 71func streamWithReplaceSessionFunc( 72 ctx context.Context, 73 logger *log.Logger, 74 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), 75 replaceSession func(ctx context.Context) error, 76 setTimestamp func(time.Time), 77 release func(error), 78) *RowIterator { 79 ctx, cancel := context.WithCancel(ctx) 80 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") 81 return &RowIterator{ 82 streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), 83 rowd: &partialResultSetDecoder{}, 84 setTimestamp: setTimestamp, 85 release: release, 86 cancel: cancel, 87 } 88} 89 90// RowIterator is an iterator over Rows. 91type RowIterator struct { 92 // The plan for the query. Available after RowIterator.Next returns 93 // iterator.Done if QueryWithStats was called. 94 QueryPlan *sppb.QueryPlan 95 96 // Execution statistics for the query. Available after RowIterator.Next 97 // returns iterator.Done if QueryWithStats was called. 98 QueryStats map[string]interface{} 99 100 // For a DML statement, the number of rows affected. For PDML, this is a 101 // lower bound. Available for DML statements after RowIterator.Next returns 102 // iterator.Done. 103 RowCount int64 104 105 // The metadata of the results of the query. The metadata are available 106 // after the first call to RowIterator.Next(), unless the first call to 107 // RowIterator.Next() returned an error that is not equal to iterator.Done. 108 Metadata *sppb.ResultSetMetadata 109 110 streamd *resumableStreamDecoder 111 rowd *partialResultSetDecoder 112 setTimestamp func(time.Time) 113 release func(error) 114 cancel func() 115 err error 116 rows []*Row 117 sawStats bool 118} 119 120// Next returns the next result. Its second return value is iterator.Done if 121// there are no more results. Once Next returns Done, all subsequent calls 122// will return Done. 123func (r *RowIterator) Next() (*Row, error) { 124 if r.err != nil { 125 return nil, r.err 126 } 127 for len(r.rows) == 0 && r.streamd.next() { 128 prs := r.streamd.get() 129 if prs.Stats != nil { 130 r.sawStats = true 131 r.QueryPlan = prs.Stats.QueryPlan 132 r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats) 133 if prs.Stats.RowCount != nil { 134 rc, err := extractRowCount(prs.Stats) 135 if err != nil { 136 return nil, err 137 } 138 r.RowCount = rc 139 } 140 } 141 var metadata *sppb.ResultSetMetadata 142 r.rows, metadata, r.err = r.rowd.add(prs) 143 if metadata != nil { 144 r.Metadata = metadata 145 } 146 if r.err != nil { 147 return nil, r.err 148 } 149 if !r.rowd.ts.IsZero() && r.setTimestamp != nil { 150 r.setTimestamp(r.rowd.ts) 151 r.setTimestamp = nil 152 } 153 } 154 if len(r.rows) > 0 { 155 row := r.rows[0] 156 r.rows = r.rows[1:] 157 return row, nil 158 } 159 if err := r.streamd.lastErr(); err != nil { 160 r.err = ToSpannerError(err) 161 } else if !r.rowd.done() { 162 r.err = errEarlyReadEnd() 163 } else { 164 r.err = iterator.Done 165 } 166 return nil, r.err 167} 168 169func extractRowCount(stats *sppb.ResultSetStats) (int64, error) { 170 if stats.RowCount == nil { 171 return 0, spannerErrorf(codes.Internal, "missing RowCount") 172 } 173 switch rc := stats.RowCount.(type) { 174 case *sppb.ResultSetStats_RowCountExact: 175 return rc.RowCountExact, nil 176 case *sppb.ResultSetStats_RowCountLowerBound: 177 return rc.RowCountLowerBound, nil 178 default: 179 return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount) 180 } 181} 182 183// Do calls the provided function once in sequence for each row in the 184// iteration. If the function returns a non-nil error, Do immediately returns 185// that error. 186// 187// If there are no rows in the iterator, Do will return nil without calling the 188// provided function. 189// 190// Do always calls Stop on the iterator. 191func (r *RowIterator) Do(f func(r *Row) error) error { 192 defer r.Stop() 193 for { 194 row, err := r.Next() 195 switch err { 196 case iterator.Done: 197 return nil 198 case nil: 199 if err = f(row); err != nil { 200 return err 201 } 202 default: 203 return err 204 } 205 } 206} 207 208// Stop terminates the iteration. It should be called after you finish using the 209// iterator. 210func (r *RowIterator) Stop() { 211 if r.streamd != nil { 212 if r.err != nil && r.err != iterator.Done { 213 defer trace.EndSpan(r.streamd.ctx, r.err) 214 } else { 215 defer trace.EndSpan(r.streamd.ctx, nil) 216 } 217 } 218 if r.cancel != nil { 219 r.cancel() 220 } 221 if r.release != nil { 222 r.release(r.err) 223 if r.err == nil { 224 r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop") 225 } 226 r.release = nil 227 } 228} 229 230// partialResultQueue implements a simple FIFO queue. The zero value is a valid 231// queue. 232type partialResultQueue struct { 233 q []*sppb.PartialResultSet 234 first int 235 last int 236 n int // number of elements in queue 237} 238 239// empty returns if the partialResultQueue is empty. 240func (q *partialResultQueue) empty() bool { 241 return q.n == 0 242} 243 244// errEmptyQueue returns error for dequeuing an empty queue. 245func errEmptyQueue() error { 246 return spannerErrorf(codes.OutOfRange, "empty partialResultQueue") 247} 248 249// peekLast returns the last item in partialResultQueue; if the queue 250// is empty, it returns error. 251func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) { 252 if q.empty() { 253 return nil, errEmptyQueue() 254 } 255 return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil 256} 257 258// push adds an item to the tail of partialResultQueue. 259func (q *partialResultQueue) push(r *sppb.PartialResultSet) { 260 if q.q == nil { 261 q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */) 262 } 263 if q.n == cap(q.q) { 264 buf := make([]*sppb.PartialResultSet, cap(q.q)*2) 265 for i := 0; i < q.n; i++ { 266 buf[i] = q.q[(q.first+i)%cap(q.q)] 267 } 268 q.q = buf 269 q.first = 0 270 q.last = q.n 271 } 272 q.q[q.last] = r 273 q.last = (q.last + 1) % cap(q.q) 274 q.n++ 275} 276 277// pop removes an item from the head of partialResultQueue and returns it. 278func (q *partialResultQueue) pop() *sppb.PartialResultSet { 279 if q.n == 0 { 280 return nil 281 } 282 r := q.q[q.first] 283 q.q[q.first] = nil 284 q.first = (q.first + 1) % cap(q.q) 285 q.n-- 286 return r 287} 288 289// clear empties partialResultQueue. 290func (q *partialResultQueue) clear() { 291 *q = partialResultQueue{} 292} 293 294// dump retrieves all items from partialResultQueue and return them in a slice. 295// It is used only in tests. 296func (q *partialResultQueue) dump() []*sppb.PartialResultSet { 297 var dq []*sppb.PartialResultSet 298 for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) { 299 dq = append(dq, q.q[i]) 300 } 301 return dq 302} 303 304// resumableStreamDecoderState encodes resumableStreamDecoder's status. See also 305// the comments for resumableStreamDecoder.Next. 306type resumableStreamDecoderState int 307 308const ( 309 unConnected resumableStreamDecoderState = iota // 0 310 queueingRetryable // 1 311 queueingUnretryable // 2 312 aborted // 3 313 finished // 4 314) 315 316// resumableStreamDecoder provides a resumable interface for receiving 317// sppb.PartialResultSet(s) from a given query wrapped by 318// resumableStreamDecoder.rpc(). 319type resumableStreamDecoder struct { 320 // state is the current status of resumableStreamDecoder, see also 321 // the comments for resumableStreamDecoder.Next. 322 state resumableStreamDecoderState 323 324 // stateWitness when non-nil is called to observe state change, 325 // used for testing. 326 stateWitness func(resumableStreamDecoderState) 327 328 // ctx is the caller's context, used for cancel/timeout Next(). 329 ctx context.Context 330 331 // rpc is a factory of streamingReceiver, which might resume 332 // a previous stream from the point encoded in restartToken. 333 // rpc is always a wrapper of a Cloud Spanner query which is 334 // resumable. 335 rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error) 336 337 // replaceSessionFunc is a function that can be used to replace the session 338 // that is being used to execute the read operation. This function should 339 // only be defined for single-use transactions that can safely retry the 340 // read operation on a new session. If this function is nil, the stream 341 // does not support retrying the query on a new session. 342 replaceSessionFunc func(ctx context.Context) error 343 344 // logger is the logger to use. 345 logger *log.Logger 346 347 // stream is the current RPC streaming receiver. 348 stream streamingReceiver 349 350 // q buffers received yet undecoded partial results. 351 q partialResultQueue 352 353 // bytesBetweenResumeTokens is the proxy of the byte size of 354 // PartialResultSets being queued between two resume tokens. Once 355 // bytesBetweenResumeTokens is greater than maxBytesBetweenResumeTokens, 356 // resumableStreamDecoder goes into queueingUnretryable state. 357 bytesBetweenResumeTokens int32 358 359 // maxBytesBetweenResumeTokens is the max number of bytes that can be 360 // buffered between two resume tokens. It is always copied from the global 361 // maxBytesBetweenResumeTokens atomically. 362 maxBytesBetweenResumeTokens int32 363 364 // np is the next sppb.PartialResultSet ready to be returned 365 // to caller of resumableStreamDecoder.Get(). 366 np *sppb.PartialResultSet 367 368 // resumeToken stores the resume token that resumableStreamDecoder has 369 // last revealed to caller. 370 resumeToken []byte 371 372 // err is the last error resumableStreamDecoder has encountered so far. 373 err error 374 375 // backoff is used for the retry settings 376 backoff gax.Backoff 377} 378 379// newResumableStreamDecoder creates a new resumeableStreamDecoder instance. 380// Parameter rpc should be a function that creates a new stream beginning at the 381// restartToken if non-nil. 382func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { 383 return &resumableStreamDecoder{ 384 ctx: ctx, 385 logger: logger, 386 rpc: rpc, 387 replaceSessionFunc: replaceSession, 388 maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens), 389 backoff: DefaultRetryBackoff, 390 } 391} 392 393// changeState fulfills state transition for resumableStateDecoder. 394func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) { 395 if d.state == queueingRetryable && d.state != target { 396 // Reset bytesBetweenResumeTokens because it is only meaningful/changed 397 // under queueingRetryable state. 398 d.bytesBetweenResumeTokens = 0 399 } 400 d.state = target 401 if d.stateWitness != nil { 402 d.stateWitness(target) 403 } 404} 405 406// isNewResumeToken returns if the observed resume token is different from 407// the one returned from server last time. 408func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool { 409 if rt == nil { 410 return false 411 } 412 if bytes.Equal(rt, d.resumeToken) { 413 return false 414 } 415 return true 416} 417 418// Next advances to the next available partial result set. If error or no 419// more, returns false, call Err to determine if an error was encountered. 420// The following diagram illustrates the state machine of resumableStreamDecoder 421// that Next() implements. Note that state transition can be only triggered by 422// RPC activities. 423/* 424 rpc() fails retryable 425 +---------+ 426 | | rpc() fails unretryable/ctx timeouts or cancelled 427 | | +------------------------------------------------+ 428 | | | | 429 | v | v 430 | +---+---+---+ +--------+ +------+--+ 431 +-----+unConnected| |finished| | aborted |<----+ 432 | | ++-----+-+ +------+--+ | 433 +---+----+--+ ^ ^ ^ | 434 | ^ | | | | 435 | | | | recv() fails | 436 | | | | | | 437 | |recv() fails retryable | | | | 438 | |with valid ctx | | | | 439 | | | | | | 440 rpc() succeeds | +-----------------------+ | | | 441 | | | recv EOF recv EOF | | 442 | | | | | | 443 v | | Queue size exceeds | | | 444 +---+----+---+----+threshold +-------+-----------+ | | 445+---------->+ +--------------->+ +-+ | 446| |queueingRetryable| |queueingUnretryable| | 447| | +<---------------+ | | 448| +---+----------+--+ pop() returns +--+----+-----------+ | 449| | | resume token | ^ | 450| | | | | | 451| | | | | | 452+---------------+ | | | | 453 recv() succeeds | +----+ | 454 | recv() succeeds | 455 | | 456 | | 457 | | 458 | | 459 | | 460 +--------------------------------------------------+ 461 recv() fails unretryable 462 463*/ 464var ( 465 // maxBytesBetweenResumeTokens is the maximum amount of bytes that 466 // resumableStreamDecoder in queueingRetryable state can use to queue 467 // PartialResultSets before getting into queueingUnretryable state. 468 maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024) 469) 470 471func (d *resumableStreamDecoder) next() bool { 472 retryer := onCodes(d.backoff, codes.Unavailable, codes.Internal) 473 for { 474 switch d.state { 475 case unConnected: 476 // If no gRPC stream is available, try to initiate one. 477 d.stream, d.err = d.rpc(d.ctx, d.resumeToken) 478 if d.err == nil { 479 d.changeState(queueingRetryable) 480 continue 481 } 482 delay, shouldRetry := retryer.Retry(d.err) 483 if !shouldRetry { 484 d.changeState(aborted) 485 continue 486 } 487 trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay) 488 if err := gax.Sleep(d.ctx, delay); err == nil { 489 // Be explicit about state transition, although the 490 // state doesn't actually change. State transition 491 // will be triggered only by RPC activity, regardless of 492 // whether there is an actual state change or not. 493 d.changeState(unConnected) 494 } else { 495 d.err = err 496 d.changeState(aborted) 497 } 498 continue 499 500 case queueingRetryable: 501 fallthrough 502 case queueingUnretryable: 503 // Receiving queue is not empty. 504 last, err := d.q.peekLast() 505 if err != nil { 506 // Only the case that receiving queue is empty could cause 507 // peekLast to return error and in such case, we should try to 508 // receive from stream. 509 d.tryRecv(retryer) 510 continue 511 } 512 if d.isNewResumeToken(last.ResumeToken) { 513 // Got new resume token, return buffered sppb.PartialResultSets 514 // to caller. 515 d.np = d.q.pop() 516 if d.q.empty() { 517 d.bytesBetweenResumeTokens = 0 518 // The new resume token was just popped out from queue, 519 // record it. 520 d.resumeToken = d.np.ResumeToken 521 d.changeState(queueingRetryable) 522 } 523 return true 524 } 525 if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable { 526 d.changeState(queueingUnretryable) 527 continue 528 } 529 if d.state == queueingUnretryable { 530 // When there is no resume token observed, only yield 531 // sppb.PartialResultSets to caller under queueingUnretryable 532 // state. 533 d.np = d.q.pop() 534 return true 535 } 536 // Needs to receive more from gRPC stream till a new resume token 537 // is observed. 538 d.tryRecv(retryer) 539 continue 540 case aborted: 541 // Discard all pending items because none of them should be yield 542 // to caller. 543 d.q.clear() 544 return false 545 case finished: 546 // If query has finished, check if there are still buffered messages. 547 if d.q.empty() { 548 // No buffered PartialResultSet. 549 return false 550 } 551 // Although query has finished, there are still buffered 552 // PartialResultSets. 553 d.np = d.q.pop() 554 return true 555 556 default: 557 logf(d.logger, "Unexpected resumableStreamDecoder.state: %v", d.state) 558 return false 559 } 560 } 561} 562 563// tryRecv attempts to receive a PartialResultSet from gRPC stream. 564func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { 565 var res *sppb.PartialResultSet 566 res, d.err = d.stream.Recv() 567 if d.err == nil { 568 d.q.push(res) 569 if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) { 570 d.bytesBetweenResumeTokens += int32(proto.Size(res)) 571 } 572 d.changeState(d.state) 573 return 574 } 575 if d.err == io.EOF { 576 d.err = nil 577 d.changeState(finished) 578 return 579 } 580 if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil { 581 // A 'Session not found' error occurred before we received a resume 582 // token and a replaceSessionFunc function is defined. Try to restart 583 // the stream on a new session. 584 if err := d.replaceSessionFunc(d.ctx); err != nil { 585 d.err = err 586 d.changeState(aborted) 587 return 588 } 589 } else { 590 delay, shouldRetry := retryer.Retry(d.err) 591 if !shouldRetry || d.state != queueingRetryable { 592 d.changeState(aborted) 593 return 594 } 595 if err := gax.Sleep(d.ctx, delay); err != nil { 596 d.err = err 597 d.changeState(aborted) 598 return 599 } 600 } 601 // Clear error and retry the stream. 602 d.err = nil 603 // Discard all queue items (none have resume tokens). 604 d.q.clear() 605 d.stream = nil 606 d.changeState(unConnected) 607} 608 609// get returns the most recent PartialResultSet generated by a call to next. 610func (d *resumableStreamDecoder) get() *sppb.PartialResultSet { 611 return d.np 612} 613 614// lastErr returns the last non-EOF error encountered. 615func (d *resumableStreamDecoder) lastErr() error { 616 return d.err 617} 618 619// partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner 620// Rows. 621type partialResultSetDecoder struct { 622 row Row 623 tx *sppb.Transaction 624 chunked bool // if true, next value should be merged with last values 625 // entry. 626 ts time.Time // read timestamp 627} 628 629// yield checks we have a complete row, and if so returns it. A row is not 630// complete if it doesn't have enough columns, or if this is a chunked response 631// and there are no further values to process. 632func (p *partialResultSetDecoder) yield(chunked, last bool) *Row { 633 if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) { 634 // When partialResultSetDecoder gets enough number of Column values. 635 // There are two cases that a new Row should be yield: 636 // 637 // 1. The incoming PartialResultSet is not chunked; 638 // 2. The incoming PartialResultSet is chunked, but the 639 // proto3.Value being merged is not the last one in 640 // the PartialResultSet. 641 // 642 // Use a fresh Row to simplify clients that want to use yielded results 643 // after the next row is retrieved. Note that fields is never changed 644 // so it doesn't need to be copied. 645 fresh := Row{ 646 fields: p.row.fields, 647 vals: make([]*proto3.Value, len(p.row.vals)), 648 } 649 copy(fresh.vals, p.row.vals) 650 p.row.vals = p.row.vals[:0] // empty and reuse slice 651 return &fresh 652 } 653 return nil 654} 655 656// yieldTx returns transaction information via caller supplied callback. 657func errChunkedEmptyRow() error { 658 return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row") 659} 660 661// add tries to merge a new PartialResultSet into buffered Row. It returns any 662// rows that have been completed as a result. 663func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, *sppb.ResultSetMetadata, error) { 664 var rows []*Row 665 if r.Metadata != nil { 666 // Metadata should only be returned in the first result. 667 if p.row.fields == nil { 668 p.row.fields = r.Metadata.RowType.Fields 669 } 670 if p.tx == nil && r.Metadata.Transaction != nil { 671 p.tx = r.Metadata.Transaction 672 if p.tx.ReadTimestamp != nil { 673 p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos)) 674 } 675 } 676 } 677 if len(r.Values) == 0 { 678 return nil, r.Metadata, nil 679 } 680 if p.chunked { 681 p.chunked = false 682 // Try to merge first value in r.Values into uncompleted row. 683 last := len(p.row.vals) - 1 684 if last < 0 { // confidence check 685 return nil, nil, errChunkedEmptyRow() 686 } 687 var err error 688 // If p is chunked, then we should always try to merge p.last with 689 // r.first. 690 if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil { 691 return nil, r.Metadata, err 692 } 693 r.Values = r.Values[1:] 694 // Merge is done, try to yield a complete Row. 695 if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil { 696 rows = append(rows, row) 697 } 698 } 699 for i, v := range r.Values { 700 // The rest values in r can be appened into p directly. 701 p.row.vals = append(p.row.vals, v) 702 // Again, check to see if a complete Row can be yielded because of the 703 // newly added value. 704 if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil { 705 rows = append(rows, row) 706 } 707 } 708 if r.ChunkedValue { 709 // After dealing with all values in r, if r is chunked then p must be 710 // also chunked. 711 p.chunked = true 712 } 713 return rows, r.Metadata, nil 714} 715 716// isMergeable returns if a protobuf Value can be potentially merged with other 717// protobuf Values. 718func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool { 719 switch a.Kind.(type) { 720 case *proto3.Value_StringValue: 721 return true 722 case *proto3.Value_ListValue: 723 return true 724 default: 725 return false 726 } 727} 728 729// errIncompatibleMergeTypes returns error for incompatible protobuf types that 730// cannot be merged by partialResultSetDecoder. 731func errIncompatibleMergeTypes(a, b *proto3.Value) error { 732 return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind) 733} 734 735// errUnsupportedMergeType returns error for protobuf type that cannot be merged 736// to other protobufs. 737func errUnsupportedMergeType(a *proto3.Value) error { 738 return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind) 739} 740 741// merge tries to combine two protobuf Values if possible. 742func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) { 743 var err error 744 typeErr := errIncompatibleMergeTypes(a, b) 745 switch t := a.Kind.(type) { 746 case *proto3.Value_StringValue: 747 s, ok := b.Kind.(*proto3.Value_StringValue) 748 if !ok { 749 return nil, typeErr 750 } 751 return &proto3.Value{ 752 Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue}, 753 }, nil 754 case *proto3.Value_ListValue: 755 l, ok := b.Kind.(*proto3.Value_ListValue) 756 if !ok { 757 return nil, typeErr 758 } 759 if l.ListValue == nil || len(l.ListValue.Values) <= 0 { 760 // b is an empty list, just return a. 761 return a, nil 762 } 763 if t.ListValue == nil || len(t.ListValue.Values) <= 0 { 764 // a is an empty list, just return b. 765 return b, nil 766 } 767 if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) { 768 // When the last item in a is of type String, List or Struct 769 // (encoded into List by Cloud Spanner), try to Merge last item in 770 // a and first item in b. 771 t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0]) 772 if err != nil { 773 return nil, err 774 } 775 l.ListValue.Values = l.ListValue.Values[1:] 776 } 777 return &proto3.Value{ 778 Kind: &proto3.Value_ListValue{ 779 ListValue: &proto3.ListValue{ 780 Values: append(t.ListValue.Values, l.ListValue.Values...), 781 }, 782 }, 783 }, nil 784 default: 785 return nil, errUnsupportedMergeType(a) 786 } 787 788} 789 790// Done returns if partialResultSetDecoder has already done with all buffered 791// values. 792func (p *partialResultSetDecoder) done() bool { 793 // There is no explicit end of stream marker, but ending part way through a 794 // row is obviously bad, or ending with the last column still awaiting 795 // completion. 796 return len(p.row.vals) == 0 && !p.chunked 797} 798