1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "context" 22 "io" 23 "log" 24 "sync/atomic" 25 "time" 26 27 "cloud.google.com/go/internal/protostruct" 28 "cloud.google.com/go/internal/trace" 29 "github.com/golang/protobuf/proto" 30 proto3 "github.com/golang/protobuf/ptypes/struct" 31 "github.com/googleapis/gax-go/v2" 32 "google.golang.org/api/iterator" 33 sppb "google.golang.org/genproto/googleapis/spanner/v1" 34 "google.golang.org/grpc/codes" 35) 36 37// streamingReceiver is the interface for receiving data from a client side 38// stream. 39type streamingReceiver interface { 40 Recv() (*sppb.PartialResultSet, error) 41} 42 43// errEarlyReadEnd returns error for read finishes when gRPC stream is still 44// active. 45func errEarlyReadEnd() error { 46 return spannerErrorf(codes.FailedPrecondition, "read completed with active stream") 47} 48 49// stream is the internal fault tolerant method for streaming data from Cloud 50// Spanner. 51func stream( 52 ctx context.Context, 53 logger *log.Logger, 54 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), 55 setTimestamp func(time.Time), 56 release func(error), 57) *RowIterator { 58 return streamWithReplaceSessionFunc( 59 ctx, 60 logger, 61 rpc, 62 nil, 63 setTimestamp, 64 release, 65 ) 66} 67 68// this stream method will automatically retry the stream on a new session if 69// the replaceSessionFunc function has been defined. This function should only be 70// used for single-use transactions. 71func streamWithReplaceSessionFunc( 72 ctx context.Context, 73 logger *log.Logger, 74 rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), 75 replaceSession func(ctx context.Context) error, 76 setTimestamp func(time.Time), 77 release func(error), 78) *RowIterator { 79 ctx, cancel := context.WithCancel(ctx) 80 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") 81 return &RowIterator{ 82 streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), 83 rowd: &partialResultSetDecoder{}, 84 setTimestamp: setTimestamp, 85 release: release, 86 cancel: cancel, 87 } 88} 89 90// RowIterator is an iterator over Rows. 91type RowIterator struct { 92 // The plan for the query. Available after RowIterator.Next returns 93 // iterator.Done if QueryWithStats was called. 94 QueryPlan *sppb.QueryPlan 95 96 // Execution statistics for the query. Available after RowIterator.Next 97 // returns iterator.Done if QueryWithStats was called. 98 QueryStats map[string]interface{} 99 100 // For a DML statement, the number of rows affected. For PDML, this is a 101 // lower bound. Available for DML statements after RowIterator.Next returns 102 // iterator.Done. 103 RowCount int64 104 105 streamd *resumableStreamDecoder 106 rowd *partialResultSetDecoder 107 setTimestamp func(time.Time) 108 release func(error) 109 cancel func() 110 err error 111 rows []*Row 112 sawStats bool 113} 114 115// Next returns the next result. Its second return value is iterator.Done if 116// there are no more results. Once Next returns Done, all subsequent calls 117// will return Done. 118func (r *RowIterator) Next() (*Row, error) { 119 if r.err != nil { 120 return nil, r.err 121 } 122 for len(r.rows) == 0 && r.streamd.next() { 123 prs := r.streamd.get() 124 if prs.Stats != nil { 125 r.sawStats = true 126 r.QueryPlan = prs.Stats.QueryPlan 127 r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats) 128 if prs.Stats.RowCount != nil { 129 rc, err := extractRowCount(prs.Stats) 130 if err != nil { 131 return nil, err 132 } 133 r.RowCount = rc 134 } 135 } 136 r.rows, r.err = r.rowd.add(prs) 137 if r.err != nil { 138 return nil, r.err 139 } 140 if !r.rowd.ts.IsZero() && r.setTimestamp != nil { 141 r.setTimestamp(r.rowd.ts) 142 r.setTimestamp = nil 143 } 144 } 145 if len(r.rows) > 0 { 146 row := r.rows[0] 147 r.rows = r.rows[1:] 148 return row, nil 149 } 150 if err := r.streamd.lastErr(); err != nil { 151 r.err = toSpannerError(err) 152 } else if !r.rowd.done() { 153 r.err = errEarlyReadEnd() 154 } else { 155 r.err = iterator.Done 156 } 157 return nil, r.err 158} 159 160func extractRowCount(stats *sppb.ResultSetStats) (int64, error) { 161 if stats.RowCount == nil { 162 return 0, spannerErrorf(codes.Internal, "missing RowCount") 163 } 164 switch rc := stats.RowCount.(type) { 165 case *sppb.ResultSetStats_RowCountExact: 166 return rc.RowCountExact, nil 167 case *sppb.ResultSetStats_RowCountLowerBound: 168 return rc.RowCountLowerBound, nil 169 default: 170 return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount) 171 } 172} 173 174// Do calls the provided function once in sequence for each row in the 175// iteration. If the function returns a non-nil error, Do immediately returns 176// that error. 177// 178// If there are no rows in the iterator, Do will return nil without calling the 179// provided function. 180// 181// Do always calls Stop on the iterator. 182func (r *RowIterator) Do(f func(r *Row) error) error { 183 defer r.Stop() 184 for { 185 row, err := r.Next() 186 switch err { 187 case iterator.Done: 188 return nil 189 case nil: 190 if err = f(row); err != nil { 191 return err 192 } 193 default: 194 return err 195 } 196 } 197} 198 199// Stop terminates the iteration. It should be called after you finish using the 200// iterator. 201func (r *RowIterator) Stop() { 202 if r.streamd != nil { 203 defer trace.EndSpan(r.streamd.ctx, r.err) 204 } 205 if r.cancel != nil { 206 r.cancel() 207 } 208 if r.release != nil { 209 r.release(r.err) 210 if r.err == nil { 211 r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop") 212 } 213 r.release = nil 214 215 } 216} 217 218// partialResultQueue implements a simple FIFO queue. The zero value is a valid 219// queue. 220type partialResultQueue struct { 221 q []*sppb.PartialResultSet 222 first int 223 last int 224 n int // number of elements in queue 225} 226 227// empty returns if the partialResultQueue is empty. 228func (q *partialResultQueue) empty() bool { 229 return q.n == 0 230} 231 232// errEmptyQueue returns error for dequeuing an empty queue. 233func errEmptyQueue() error { 234 return spannerErrorf(codes.OutOfRange, "empty partialResultQueue") 235} 236 237// peekLast returns the last item in partialResultQueue; if the queue 238// is empty, it returns error. 239func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) { 240 if q.empty() { 241 return nil, errEmptyQueue() 242 } 243 return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil 244} 245 246// push adds an item to the tail of partialResultQueue. 247func (q *partialResultQueue) push(r *sppb.PartialResultSet) { 248 if q.q == nil { 249 q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */) 250 } 251 if q.n == cap(q.q) { 252 buf := make([]*sppb.PartialResultSet, cap(q.q)*2) 253 for i := 0; i < q.n; i++ { 254 buf[i] = q.q[(q.first+i)%cap(q.q)] 255 } 256 q.q = buf 257 q.first = 0 258 q.last = q.n 259 } 260 q.q[q.last] = r 261 q.last = (q.last + 1) % cap(q.q) 262 q.n++ 263} 264 265// pop removes an item from the head of partialResultQueue and returns it. 266func (q *partialResultQueue) pop() *sppb.PartialResultSet { 267 if q.n == 0 { 268 return nil 269 } 270 r := q.q[q.first] 271 q.q[q.first] = nil 272 q.first = (q.first + 1) % cap(q.q) 273 q.n-- 274 return r 275} 276 277// clear empties partialResultQueue. 278func (q *partialResultQueue) clear() { 279 *q = partialResultQueue{} 280} 281 282// dump retrieves all items from partialResultQueue and return them in a slice. 283// It is used only in tests. 284func (q *partialResultQueue) dump() []*sppb.PartialResultSet { 285 var dq []*sppb.PartialResultSet 286 for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) { 287 dq = append(dq, q.q[i]) 288 } 289 return dq 290} 291 292// resumableStreamDecoderState encodes resumableStreamDecoder's status. See also 293// the comments for resumableStreamDecoder.Next. 294type resumableStreamDecoderState int 295 296const ( 297 unConnected resumableStreamDecoderState = iota // 0 298 queueingRetryable // 1 299 queueingUnretryable // 2 300 aborted // 3 301 finished // 4 302) 303 304// resumableStreamDecoder provides a resumable interface for receiving 305// sppb.PartialResultSet(s) from a given query wrapped by 306// resumableStreamDecoder.rpc(). 307type resumableStreamDecoder struct { 308 // state is the current status of resumableStreamDecoder, see also 309 // the comments for resumableStreamDecoder.Next. 310 state resumableStreamDecoderState 311 312 // stateWitness when non-nil is called to observe state change, 313 // used for testing. 314 stateWitness func(resumableStreamDecoderState) 315 316 // ctx is the caller's context, used for cancel/timeout Next(). 317 ctx context.Context 318 319 // rpc is a factory of streamingReceiver, which might resume 320 // a previous stream from the point encoded in restartToken. 321 // rpc is always a wrapper of a Cloud Spanner query which is 322 // resumable. 323 rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error) 324 325 // replaceSessionFunc is a function that can be used to replace the session 326 // that is being used to execute the read operation. This function should 327 // only be defined for single-use transactions that can safely retry the 328 // read operation on a new session. If this function is nil, the stream 329 // does not support retrying the query on a new session. 330 replaceSessionFunc func(ctx context.Context) error 331 332 // logger is the logger to use. 333 logger *log.Logger 334 335 // stream is the current RPC streaming receiver. 336 stream streamingReceiver 337 338 // q buffers received yet undecoded partial results. 339 q partialResultQueue 340 341 // bytesBetweenResumeTokens is the proxy of the byte size of 342 // PartialResultSets being queued between two resume tokens. Once 343 // bytesBetweenResumeTokens is greater than maxBytesBetweenResumeTokens, 344 // resumableStreamDecoder goes into queueingUnretryable state. 345 bytesBetweenResumeTokens int32 346 347 // maxBytesBetweenResumeTokens is the max number of bytes that can be 348 // buffered between two resume tokens. It is always copied from the global 349 // maxBytesBetweenResumeTokens atomically. 350 maxBytesBetweenResumeTokens int32 351 352 // np is the next sppb.PartialResultSet ready to be returned 353 // to caller of resumableStreamDecoder.Get(). 354 np *sppb.PartialResultSet 355 356 // resumeToken stores the resume token that resumableStreamDecoder has 357 // last revealed to caller. 358 resumeToken []byte 359 360 // err is the last error resumableStreamDecoder has encountered so far. 361 err error 362 363 // backoff is used for the retry settings 364 backoff gax.Backoff 365} 366 367// newResumableStreamDecoder creates a new resumeableStreamDecoder instance. 368// Parameter rpc should be a function that creates a new stream beginning at the 369// restartToken if non-nil. 370func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { 371 return &resumableStreamDecoder{ 372 ctx: ctx, 373 logger: logger, 374 rpc: rpc, 375 replaceSessionFunc: replaceSession, 376 maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens), 377 backoff: DefaultRetryBackoff, 378 } 379} 380 381// changeState fulfills state transition for resumableStateDecoder. 382func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) { 383 if d.state == queueingRetryable && d.state != target { 384 // Reset bytesBetweenResumeTokens because it is only meaningful/changed 385 // under queueingRetryable state. 386 d.bytesBetweenResumeTokens = 0 387 } 388 d.state = target 389 if d.stateWitness != nil { 390 d.stateWitness(target) 391 } 392} 393 394// isNewResumeToken returns if the observed resume token is different from 395// the one returned from server last time. 396func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool { 397 if rt == nil { 398 return false 399 } 400 if bytes.Equal(rt, d.resumeToken) { 401 return false 402 } 403 return true 404} 405 406// Next advances to the next available partial result set. If error or no 407// more, returns false, call Err to determine if an error was encountered. 408// The following diagram illustrates the state machine of resumableStreamDecoder 409// that Next() implements. Note that state transition can be only triggered by 410// RPC activities. 411/* 412 rpc() fails retryable 413 +---------+ 414 | | rpc() fails unretryable/ctx timeouts or cancelled 415 | | +------------------------------------------------+ 416 | | | | 417 | v | v 418 | +---+---+---+ +--------+ +------+--+ 419 +-----+unConnected| |finished| | aborted |<----+ 420 | | ++-----+-+ +------+--+ | 421 +---+----+--+ ^ ^ ^ | 422 | ^ | | | | 423 | | | | recv() fails | 424 | | | | | | 425 | |recv() fails retryable | | | | 426 | |with valid ctx | | | | 427 | | | | | | 428 rpc() succeeds | +-----------------------+ | | | 429 | | | recv EOF recv EOF | | 430 | | | | | | 431 v | | Queue size exceeds | | | 432 +---+----+---+----+threshold +-------+-----------+ | | 433+---------->+ +--------------->+ +-+ | 434| |queueingRetryable| |queueingUnretryable| | 435| | +<---------------+ | | 436| +---+----------+--+ pop() returns +--+----+-----------+ | 437| | | resume token | ^ | 438| | | | | | 439| | | | | | 440+---------------+ | | | | 441 recv() succeeds | +----+ | 442 | recv() succeeds | 443 | | 444 | | 445 | | 446 | | 447 | | 448 +--------------------------------------------------+ 449 recv() fails unretryable 450 451*/ 452var ( 453 // maxBytesBetweenResumeTokens is the maximum amount of bytes that 454 // resumableStreamDecoder in queueingRetryable state can use to queue 455 // PartialResultSets before getting into queueingUnretryable state. 456 maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024) 457) 458 459func (d *resumableStreamDecoder) next() bool { 460 retryer := onCodes(d.backoff, codes.Unavailable, codes.Internal) 461 for { 462 switch d.state { 463 case unConnected: 464 // If no gRPC stream is available, try to initiate one. 465 d.stream, d.err = d.rpc(d.ctx, d.resumeToken) 466 if d.err == nil { 467 d.changeState(queueingRetryable) 468 continue 469 } 470 delay, shouldRetry := retryer.Retry(d.err) 471 if !shouldRetry { 472 d.changeState(aborted) 473 continue 474 } 475 trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay) 476 if err := gax.Sleep(d.ctx, delay); err == nil { 477 // Be explicit about state transition, although the 478 // state doesn't actually change. State transition 479 // will be triggered only by RPC activity, regardless of 480 // whether there is an actual state change or not. 481 d.changeState(unConnected) 482 } else { 483 d.err = err 484 d.changeState(aborted) 485 } 486 continue 487 488 case queueingRetryable: 489 fallthrough 490 case queueingUnretryable: 491 // Receiving queue is not empty. 492 last, err := d.q.peekLast() 493 if err != nil { 494 // Only the case that receiving queue is empty could cause 495 // peekLast to return error and in such case, we should try to 496 // receive from stream. 497 d.tryRecv(retryer) 498 continue 499 } 500 if d.isNewResumeToken(last.ResumeToken) { 501 // Got new resume token, return buffered sppb.PartialResultSets 502 // to caller. 503 d.np = d.q.pop() 504 if d.q.empty() { 505 d.bytesBetweenResumeTokens = 0 506 // The new resume token was just popped out from queue, 507 // record it. 508 d.resumeToken = d.np.ResumeToken 509 d.changeState(queueingRetryable) 510 } 511 return true 512 } 513 if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable { 514 d.changeState(queueingUnretryable) 515 continue 516 } 517 if d.state == queueingUnretryable { 518 // When there is no resume token observed, only yield 519 // sppb.PartialResultSets to caller under queueingUnretryable 520 // state. 521 d.np = d.q.pop() 522 return true 523 } 524 // Needs to receive more from gRPC stream till a new resume token 525 // is observed. 526 d.tryRecv(retryer) 527 continue 528 case aborted: 529 // Discard all pending items because none of them should be yield 530 // to caller. 531 d.q.clear() 532 return false 533 case finished: 534 // If query has finished, check if there are still buffered messages. 535 if d.q.empty() { 536 // No buffered PartialResultSet. 537 return false 538 } 539 // Although query has finished, there are still buffered 540 // PartialResultSets. 541 d.np = d.q.pop() 542 return true 543 544 default: 545 logf(d.logger, "Unexpected resumableStreamDecoder.state: %v", d.state) 546 return false 547 } 548 } 549} 550 551// tryRecv attempts to receive a PartialResultSet from gRPC stream. 552func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { 553 var res *sppb.PartialResultSet 554 res, d.err = d.stream.Recv() 555 if d.err == nil { 556 d.q.push(res) 557 if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) { 558 d.bytesBetweenResumeTokens += int32(proto.Size(res)) 559 } 560 d.changeState(d.state) 561 return 562 } 563 if d.err == io.EOF { 564 d.err = nil 565 d.changeState(finished) 566 return 567 } 568 if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil { 569 // A 'Session not found' error occurred before we received a resume 570 // token and a replaceSessionFunc function is defined. Try to restart 571 // the stream on a new session. 572 if err := d.replaceSessionFunc(d.ctx); err != nil { 573 d.err = err 574 d.changeState(aborted) 575 return 576 } 577 } else { 578 delay, shouldRetry := retryer.Retry(d.err) 579 if !shouldRetry || d.state != queueingRetryable { 580 d.changeState(aborted) 581 return 582 } 583 if err := gax.Sleep(d.ctx, delay); err != nil { 584 d.err = err 585 d.changeState(aborted) 586 return 587 } 588 } 589 // Clear error and retry the stream. 590 d.err = nil 591 // Discard all queue items (none have resume tokens). 592 d.q.clear() 593 d.stream = nil 594 d.changeState(unConnected) 595} 596 597// get returns the most recent PartialResultSet generated by a call to next. 598func (d *resumableStreamDecoder) get() *sppb.PartialResultSet { 599 return d.np 600} 601 602// lastErr returns the last non-EOF error encountered. 603func (d *resumableStreamDecoder) lastErr() error { 604 return d.err 605} 606 607// partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner 608// Rows. 609type partialResultSetDecoder struct { 610 row Row 611 tx *sppb.Transaction 612 chunked bool // if true, next value should be merged with last values 613 // entry. 614 ts time.Time // read timestamp 615} 616 617// yield checks we have a complete row, and if so returns it. A row is not 618// complete if it doesn't have enough columns, or if this is a chunked response 619// and there are no further values to process. 620func (p *partialResultSetDecoder) yield(chunked, last bool) *Row { 621 if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) { 622 // When partialResultSetDecoder gets enough number of Column values. 623 // There are two cases that a new Row should be yield: 624 // 625 // 1. The incoming PartialResultSet is not chunked; 626 // 2. The incoming PartialResultSet is chunked, but the 627 // proto3.Value being merged is not the last one in 628 // the PartialResultSet. 629 // 630 // Use a fresh Row to simplify clients that want to use yielded results 631 // after the next row is retrieved. Note that fields is never changed 632 // so it doesn't need to be copied. 633 fresh := Row{ 634 fields: p.row.fields, 635 vals: make([]*proto3.Value, len(p.row.vals)), 636 } 637 copy(fresh.vals, p.row.vals) 638 p.row.vals = p.row.vals[:0] // empty and reuse slice 639 return &fresh 640 } 641 return nil 642} 643 644// yieldTx returns transaction information via caller supplied callback. 645func errChunkedEmptyRow() error { 646 return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row") 647} 648 649// add tries to merge a new PartialResultSet into buffered Row. It returns any 650// rows that have been completed as a result. 651func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) { 652 var rows []*Row 653 if r.Metadata != nil { 654 // Metadata should only be returned in the first result. 655 if p.row.fields == nil { 656 p.row.fields = r.Metadata.RowType.Fields 657 } 658 if p.tx == nil && r.Metadata.Transaction != nil { 659 p.tx = r.Metadata.Transaction 660 if p.tx.ReadTimestamp != nil { 661 p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos)) 662 } 663 } 664 } 665 if len(r.Values) == 0 { 666 return nil, nil 667 } 668 if p.chunked { 669 p.chunked = false 670 // Try to merge first value in r.Values into uncompleted row. 671 last := len(p.row.vals) - 1 672 if last < 0 { // sanity check 673 return nil, errChunkedEmptyRow() 674 } 675 var err error 676 // If p is chunked, then we should always try to merge p.last with 677 // r.first. 678 if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil { 679 return nil, err 680 } 681 r.Values = r.Values[1:] 682 // Merge is done, try to yield a complete Row. 683 if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil { 684 rows = append(rows, row) 685 } 686 } 687 for i, v := range r.Values { 688 // The rest values in r can be appened into p directly. 689 p.row.vals = append(p.row.vals, v) 690 // Again, check to see if a complete Row can be yielded because of the 691 // newly added value. 692 if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil { 693 rows = append(rows, row) 694 } 695 } 696 if r.ChunkedValue { 697 // After dealing with all values in r, if r is chunked then p must be 698 // also chunked. 699 p.chunked = true 700 } 701 return rows, nil 702} 703 704// isMergeable returns if a protobuf Value can be potentially merged with other 705// protobuf Values. 706func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool { 707 switch a.Kind.(type) { 708 case *proto3.Value_StringValue: 709 return true 710 case *proto3.Value_ListValue: 711 return true 712 default: 713 return false 714 } 715} 716 717// errIncompatibleMergeTypes returns error for incompatible protobuf types that 718// cannot be merged by partialResultSetDecoder. 719func errIncompatibleMergeTypes(a, b *proto3.Value) error { 720 return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind) 721} 722 723// errUnsupportedMergeType returns error for protobuf type that cannot be merged 724// to other protobufs. 725func errUnsupportedMergeType(a *proto3.Value) error { 726 return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind) 727} 728 729// merge tries to combine two protobuf Values if possible. 730func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) { 731 var err error 732 typeErr := errIncompatibleMergeTypes(a, b) 733 switch t := a.Kind.(type) { 734 case *proto3.Value_StringValue: 735 s, ok := b.Kind.(*proto3.Value_StringValue) 736 if !ok { 737 return nil, typeErr 738 } 739 return &proto3.Value{ 740 Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue}, 741 }, nil 742 case *proto3.Value_ListValue: 743 l, ok := b.Kind.(*proto3.Value_ListValue) 744 if !ok { 745 return nil, typeErr 746 } 747 if l.ListValue == nil || len(l.ListValue.Values) <= 0 { 748 // b is an empty list, just return a. 749 return a, nil 750 } 751 if t.ListValue == nil || len(t.ListValue.Values) <= 0 { 752 // a is an empty list, just return b. 753 return b, nil 754 } 755 if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) { 756 // When the last item in a is of type String, List or Struct 757 // (encoded into List by Cloud Spanner), try to Merge last item in 758 // a and first item in b. 759 t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0]) 760 if err != nil { 761 return nil, err 762 } 763 l.ListValue.Values = l.ListValue.Values[1:] 764 } 765 return &proto3.Value{ 766 Kind: &proto3.Value_ListValue{ 767 ListValue: &proto3.ListValue{ 768 Values: append(t.ListValue.Values, l.ListValue.Values...), 769 }, 770 }, 771 }, nil 772 default: 773 return nil, errUnsupportedMergeType(a) 774 } 775 776} 777 778// Done returns if partialResultSetDecoder has already done with all buffered 779// values. 780func (p *partialResultSetDecoder) done() bool { 781 // There is no explicit end of stream marker, but ending part way through a 782 // row is obviously bad, or ending with the last column still awaiting 783 // completion. 784 return len(p.row.vals) == 0 && !p.chunked 785} 786