1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "bytes" 21 "context" 22 "io" 23 "log" 24 "sync/atomic" 25 "time" 26 27 "cloud.google.com/go/internal/protostruct" 28 "cloud.google.com/go/internal/trace" 29 "cloud.google.com/go/spanner/internal/backoff" 30 proto "github.com/golang/protobuf/proto" 31 proto3 "github.com/golang/protobuf/ptypes/struct" 32 "google.golang.org/api/iterator" 33 sppb "google.golang.org/genproto/googleapis/spanner/v1" 34 "google.golang.org/grpc/codes" 35) 36 37// streamingReceiver is the interface for receiving data from a client side 38// stream. 39type streamingReceiver interface { 40 Recv() (*sppb.PartialResultSet, error) 41} 42 43// errEarlyReadEnd returns error for read finishes when gRPC stream is still active. 44func errEarlyReadEnd() error { 45 return spannerErrorf(codes.FailedPrecondition, "read completed with active stream") 46} 47 48// stream is the internal fault tolerant method for streaming data from 49// Cloud Spanner. 50func stream(ctx context.Context, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error)) *RowIterator { 51 ctx, cancel := context.WithCancel(ctx) 52 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") 53 return &RowIterator{ 54 streamd: newResumableStreamDecoder(ctx, rpc), 55 rowd: &partialResultSetDecoder{}, 56 setTimestamp: setTimestamp, 57 release: release, 58 cancel: cancel, 59 } 60} 61 62// RowIterator is an iterator over Rows. 63type RowIterator struct { 64 // The plan for the query. Available after RowIterator.Next returns iterator.Done 65 // if QueryWithStats was called. 66 QueryPlan *sppb.QueryPlan 67 68 // Execution statistics for the query. Available after RowIterator.Next returns iterator.Done 69 // if QueryWithStats was called. 70 QueryStats map[string]interface{} 71 72 // For a DML statement, the number of rows affected. For PDML, this is a lower bound. 73 // Available for DML statements after RowIterator.Next returns iterator.Done. 74 RowCount int64 75 76 streamd *resumableStreamDecoder 77 rowd *partialResultSetDecoder 78 setTimestamp func(time.Time) 79 release func(error) 80 cancel func() 81 err error 82 rows []*Row 83 sawStats bool 84} 85 86// Next returns the next result. Its second return value is iterator.Done if 87// there are no more results. Once Next returns Done, all subsequent calls 88// will return Done. 89func (r *RowIterator) Next() (*Row, error) { 90 if r.err != nil { 91 return nil, r.err 92 } 93 for len(r.rows) == 0 && r.streamd.next() { 94 prs := r.streamd.get() 95 if prs.Stats != nil { 96 r.sawStats = true 97 r.QueryPlan = prs.Stats.QueryPlan 98 r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats) 99 if prs.Stats.RowCount != nil { 100 rc, err := extractRowCount(prs.Stats) 101 if err != nil { 102 return nil, err 103 } 104 r.RowCount = rc 105 } 106 } 107 r.rows, r.err = r.rowd.add(prs) 108 if r.err != nil { 109 return nil, r.err 110 } 111 if !r.rowd.ts.IsZero() && r.setTimestamp != nil { 112 r.setTimestamp(r.rowd.ts) 113 r.setTimestamp = nil 114 } 115 } 116 if len(r.rows) > 0 { 117 row := r.rows[0] 118 r.rows = r.rows[1:] 119 return row, nil 120 } 121 if err := r.streamd.lastErr(); err != nil { 122 r.err = toSpannerError(err) 123 } else if !r.rowd.done() { 124 r.err = errEarlyReadEnd() 125 } else { 126 r.err = iterator.Done 127 } 128 return nil, r.err 129} 130 131func extractRowCount(stats *sppb.ResultSetStats) (int64, error) { 132 if stats.RowCount == nil { 133 return 0, spannerErrorf(codes.Internal, "missing RowCount") 134 } 135 switch rc := stats.RowCount.(type) { 136 case *sppb.ResultSetStats_RowCountExact: 137 return rc.RowCountExact, nil 138 case *sppb.ResultSetStats_RowCountLowerBound: 139 return rc.RowCountLowerBound, nil 140 default: 141 return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount) 142 } 143} 144 145// Do calls the provided function once in sequence for each row in the iteration. If the 146// function returns a non-nil error, Do immediately returns that error. 147// 148// If there are no rows in the iterator, Do will return nil without calling the 149// provided function. 150// 151// Do always calls Stop on the iterator. 152func (r *RowIterator) Do(f func(r *Row) error) error { 153 defer r.Stop() 154 for { 155 row, err := r.Next() 156 switch err { 157 case iterator.Done: 158 return nil 159 case nil: 160 if err = f(row); err != nil { 161 return err 162 } 163 default: 164 return err 165 } 166 } 167} 168 169// Stop terminates the iteration. It should be called after you finish using the iterator. 170func (r *RowIterator) Stop() { 171 if r.streamd != nil { 172 defer trace.EndSpan(r.streamd.ctx, r.err) 173 } 174 if r.cancel != nil { 175 r.cancel() 176 } 177 if r.release != nil { 178 r.release(r.err) 179 if r.err == nil { 180 r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop") 181 } 182 r.release = nil 183 184 } 185} 186 187// partialResultQueue implements a simple FIFO queue. The zero value is a 188// valid queue. 189type partialResultQueue struct { 190 q []*sppb.PartialResultSet 191 first int 192 last int 193 n int // number of elements in queue 194} 195 196// empty returns if the partialResultQueue is empty. 197func (q *partialResultQueue) empty() bool { 198 return q.n == 0 199} 200 201// errEmptyQueue returns error for dequeuing an empty queue. 202func errEmptyQueue() error { 203 return spannerErrorf(codes.OutOfRange, "empty partialResultQueue") 204} 205 206// peekLast returns the last item in partialResultQueue; if the queue 207// is empty, it returns error. 208func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) { 209 if q.empty() { 210 return nil, errEmptyQueue() 211 } 212 return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil 213} 214 215// push adds an item to the tail of partialResultQueue. 216func (q *partialResultQueue) push(r *sppb.PartialResultSet) { 217 if q.q == nil { 218 q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */) 219 } 220 if q.n == cap(q.q) { 221 buf := make([]*sppb.PartialResultSet, cap(q.q)*2) 222 for i := 0; i < q.n; i++ { 223 buf[i] = q.q[(q.first+i)%cap(q.q)] 224 } 225 q.q = buf 226 q.first = 0 227 q.last = q.n 228 } 229 q.q[q.last] = r 230 q.last = (q.last + 1) % cap(q.q) 231 q.n++ 232} 233 234// pop removes an item from the head of partialResultQueue and returns 235// it. 236func (q *partialResultQueue) pop() *sppb.PartialResultSet { 237 if q.n == 0 { 238 return nil 239 } 240 r := q.q[q.first] 241 q.q[q.first] = nil 242 q.first = (q.first + 1) % cap(q.q) 243 q.n-- 244 return r 245} 246 247// clear empties partialResultQueue. 248func (q *partialResultQueue) clear() { 249 *q = partialResultQueue{} 250} 251 252// dump retrieves all items from partialResultQueue and return them in a slice. 253// It is used only in tests. 254func (q *partialResultQueue) dump() []*sppb.PartialResultSet { 255 var dq []*sppb.PartialResultSet 256 for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) { 257 dq = append(dq, q.q[i]) 258 } 259 return dq 260} 261 262// resumableStreamDecoderState encodes resumableStreamDecoder's status. 263// See also the comments for resumableStreamDecoder.Next. 264type resumableStreamDecoderState int 265 266const ( 267 unConnected resumableStreamDecoderState = iota // 0 268 queueingRetryable // 1 269 queueingUnretryable // 2 270 aborted // 3 271 finished // 4 272) 273 274// resumableStreamDecoder provides a resumable interface for receiving 275// sppb.PartialResultSet(s) from a given query wrapped by 276// resumableStreamDecoder.rpc(). 277type resumableStreamDecoder struct { 278 // state is the current status of resumableStreamDecoder, see also 279 // the comments for resumableStreamDecoder.Next. 280 state resumableStreamDecoderState 281 // stateWitness when non-nil is called to observe state change, 282 // used for testing. 283 stateWitness func(resumableStreamDecoderState) 284 // ctx is the caller's context, used for cancel/timeout Next(). 285 ctx context.Context 286 // rpc is a factory of streamingReceiver, which might resume 287 // a previous stream from the point encoded in restartToken. 288 // rpc is always a wrapper of a Cloud Spanner query which is 289 // resumable. 290 rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error) 291 // stream is the current RPC streaming receiver. 292 stream streamingReceiver 293 // q buffers received yet undecoded partial results. 294 q partialResultQueue 295 // bytesBetweenResumeTokens is the proxy of the byte size of PartialResultSets being queued 296 // between two resume tokens. Once bytesBetweenResumeTokens is greater than 297 // maxBytesBetweenResumeTokens, resumableStreamDecoder goes into queueingUnretryable state. 298 bytesBetweenResumeTokens int32 299 // maxBytesBetweenResumeTokens is the max number of bytes that can be buffered 300 // between two resume tokens. It is always copied from the global maxBytesBetweenResumeTokens 301 // atomically. 302 maxBytesBetweenResumeTokens int32 303 // np is the next sppb.PartialResultSet ready to be returned 304 // to caller of resumableStreamDecoder.Get(). 305 np *sppb.PartialResultSet 306 // resumeToken stores the resume token that resumableStreamDecoder has 307 // last revealed to caller. 308 resumeToken []byte 309 // retryCount is the number of retries that have been carried out so far 310 retryCount int 311 // err is the last error resumableStreamDecoder has encountered so far. 312 err error 313 // backoff to compute delays between retries. 314 backoff backoff.ExponentialBackoff 315} 316 317// newResumableStreamDecoder creates a new resumeableStreamDecoder instance. 318// Parameter rpc should be a function that creates a new stream 319// beginning at the restartToken if non-nil. 320func newResumableStreamDecoder(ctx context.Context, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error)) *resumableStreamDecoder { 321 return &resumableStreamDecoder{ 322 ctx: ctx, 323 rpc: rpc, 324 maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens), 325 backoff: backoff.DefaultBackoff, 326 } 327} 328 329// changeState fulfills state transition for resumableStateDecoder. 330func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) { 331 if d.state == queueingRetryable && d.state != target { 332 // Reset bytesBetweenResumeTokens because it is only meaningful/changed under 333 // queueingRetryable state. 334 d.bytesBetweenResumeTokens = 0 335 } 336 d.state = target 337 if d.stateWitness != nil { 338 d.stateWitness(target) 339 } 340} 341 342// isNewResumeToken returns if the observed resume token is different from 343// the one returned from server last time. 344func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool { 345 if rt == nil { 346 return false 347 } 348 if bytes.Equal(rt, d.resumeToken) { 349 return false 350 } 351 return true 352} 353 354// Next advances to the next available partial result set. If error or no 355// more, returns false, call Err to determine if an error was encountered. 356// The following diagram illustrates the state machine of resumableStreamDecoder 357// that Next() implements. Note that state transition can be only triggered by 358// RPC activities. 359/* 360 rpc() fails retryable 361 +---------+ 362 | | rpc() fails unretryable/ctx timeouts or cancelled 363 | | +------------------------------------------------+ 364 | | | | 365 | v | v 366 | +---+---+---+ +--------+ +------+--+ 367 +-----+unConnected| |finished| | aborted |<----+ 368 | | ++-----+-+ +------+--+ | 369 +---+----+--+ ^ ^ ^ | 370 | ^ | | | | 371 | | | | recv() fails | 372 | | | | | | 373 | |recv() fails retryable | | | | 374 | |with valid ctx | | | | 375 | | | | | | 376 rpc() succeeds | +-----------------------+ | | | 377 | | | recv EOF recv EOF | | 378 | | | | | | 379 v | | Queue size exceeds | | | 380 +---+----+---+----+threshold +-------+-----------+ | | 381+---------->+ +--------------->+ +-+ | 382| |queueingRetryable| |queueingUnretryable| | 383| | +<---------------+ | | 384| +---+----------+--+ pop() returns +--+----+-----------+ | 385| | | resume token | ^ | 386| | | | | | 387| | | | | | 388+---------------+ | | | | 389 recv() succeeds | +----+ | 390 | recv() succeeds | 391 | | 392 | | 393 | | 394 | | 395 | | 396 +--------------------------------------------------+ 397 recv() fails unretryable 398 399*/ 400var ( 401 // maxBytesBetweenResumeTokens is the maximum amount of bytes that resumableStreamDecoder 402 // in queueingRetryable state can use to queue PartialResultSets before getting 403 // into queueingUnretryable state. 404 maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024) 405) 406 407func (d *resumableStreamDecoder) next() bool { 408 for { 409 select { 410 case <-d.ctx.Done(): 411 // Do context check here so that even gRPC failed to do 412 // so, resumableStreamDecoder can still break the loop 413 // as expected. 414 d.err = errContextCanceled(d.ctx, d.err) 415 d.changeState(aborted) 416 default: 417 } 418 switch d.state { 419 case unConnected: 420 // If no gRPC stream is available, try to initiate one. 421 if d.stream, d.err = d.rpc(d.ctx, d.resumeToken); d.err != nil { 422 if isRetryable(d.err) { 423 d.doBackOff() 424 // Be explicit about state transition, although the 425 // state doesn't actually change. State transition 426 // will be triggered only by RPC activity, regardless of 427 // whether there is an actual state change or not. 428 d.changeState(unConnected) 429 continue 430 } 431 d.changeState(aborted) 432 continue 433 } 434 d.resetBackOff() 435 d.changeState(queueingRetryable) 436 continue 437 case queueingRetryable: 438 fallthrough 439 case queueingUnretryable: 440 // Receiving queue is not empty. 441 last, err := d.q.peekLast() 442 if err != nil { 443 // Only the case that receiving queue is empty could cause peekLast to 444 // return error and in such case, we should try to receive from stream. 445 d.tryRecv() 446 continue 447 } 448 if d.isNewResumeToken(last.ResumeToken) { 449 // Got new resume token, return buffered sppb.PartialResultSets to caller. 450 d.np = d.q.pop() 451 if d.q.empty() { 452 d.bytesBetweenResumeTokens = 0 453 // The new resume token was just popped out from queue, record it. 454 d.resumeToken = d.np.ResumeToken 455 d.changeState(queueingRetryable) 456 } 457 return true 458 } 459 if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable { 460 d.changeState(queueingUnretryable) 461 continue 462 } 463 if d.state == queueingUnretryable { 464 // When there is no resume token observed, 465 // only yield sppb.PartialResultSets to caller under 466 // queueingUnretryable state. 467 d.np = d.q.pop() 468 return true 469 } 470 // Needs to receive more from gRPC stream till a new resume token 471 // is observed. 472 d.tryRecv() 473 continue 474 case aborted: 475 // Discard all pending items because none of them 476 // should be yield to caller. 477 d.q.clear() 478 return false 479 case finished: 480 // If query has finished, check if there are still buffered messages. 481 if d.q.empty() { 482 // No buffered PartialResultSet. 483 return false 484 } 485 // Although query has finished, there are still buffered PartialResultSets. 486 d.np = d.q.pop() 487 return true 488 489 default: 490 log.Printf("Unexpected resumableStreamDecoder.state: %v", d.state) 491 return false 492 } 493 } 494} 495 496// tryRecv attempts to receive a PartialResultSet from gRPC stream. 497func (d *resumableStreamDecoder) tryRecv() { 498 var res *sppb.PartialResultSet 499 if res, d.err = d.stream.Recv(); d.err != nil { 500 if d.err == io.EOF { 501 d.err = nil 502 d.changeState(finished) 503 return 504 } 505 if isRetryable(d.err) && d.state == queueingRetryable { 506 d.err = nil 507 // Discard all queue items (none have resume tokens). 508 d.q.clear() 509 d.stream = nil 510 d.changeState(unConnected) 511 d.doBackOff() 512 return 513 } 514 d.changeState(aborted) 515 return 516 } 517 d.q.push(res) 518 if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) { 519 // adjusting d.bytesBetweenResumeTokens 520 d.bytesBetweenResumeTokens += int32(proto.Size(res)) 521 } 522 d.resetBackOff() 523 d.changeState(d.state) 524} 525 526// resetBackOff clears the internal retry counter of 527// resumableStreamDecoder so that the next exponential 528// backoff will start at a fresh state. 529func (d *resumableStreamDecoder) resetBackOff() { 530 d.retryCount = 0 531} 532 533// doBackoff does an exponential backoff sleep. 534func (d *resumableStreamDecoder) doBackOff() { 535 delay := d.backoff.Delay(d.retryCount) 536 trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay) 537 ticker := time.NewTicker(delay) 538 defer ticker.Stop() 539 d.retryCount++ 540 select { 541 case <-d.ctx.Done(): 542 case <-ticker.C: 543 } 544} 545 546// get returns the most recent PartialResultSet generated by a call to next. 547func (d *resumableStreamDecoder) get() *sppb.PartialResultSet { 548 return d.np 549} 550 551// lastErr returns the last non-EOF error encountered. 552func (d *resumableStreamDecoder) lastErr() error { 553 return d.err 554} 555 556// partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner 557// Rows. 558type partialResultSetDecoder struct { 559 row Row 560 tx *sppb.Transaction 561 chunked bool // if true, next value should be merged with last values entry. 562 ts time.Time // read timestamp 563} 564 565// yield checks we have a complete row, and if so returns it. A row is not 566// complete if it doesn't have enough columns, or if this is a chunked response 567// and there are no further values to process. 568func (p *partialResultSetDecoder) yield(chunked, last bool) *Row { 569 if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) { 570 // When partialResultSetDecoder gets enough number of 571 // Column values, There are two cases that a new Row 572 // should be yield: 573 // 1. The incoming PartialResultSet is not chunked; 574 // 2. The incoming PartialResultSet is chunked, but the 575 // proto3.Value being merged is not the last one in 576 // the PartialResultSet. 577 // 578 // Use a fresh Row to simplify clients that want to use yielded results 579 // after the next row is retrieved. Note that fields is never changed 580 // so it doesn't need to be copied. 581 fresh := Row{ 582 fields: p.row.fields, 583 vals: make([]*proto3.Value, len(p.row.vals)), 584 } 585 copy(fresh.vals, p.row.vals) 586 p.row.vals = p.row.vals[:0] // empty and reuse slice 587 return &fresh 588 } 589 return nil 590} 591 592// yieldTx returns transaction information via caller supplied callback. 593func errChunkedEmptyRow() error { 594 return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row") 595} 596 597// add tries to merge a new PartialResultSet into buffered Row. It returns 598// any rows that have been completed as a result. 599func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) { 600 var rows []*Row 601 if r.Metadata != nil { 602 // Metadata should only be returned in the first result. 603 if p.row.fields == nil { 604 p.row.fields = r.Metadata.RowType.Fields 605 } 606 if p.tx == nil && r.Metadata.Transaction != nil { 607 p.tx = r.Metadata.Transaction 608 if p.tx.ReadTimestamp != nil { 609 p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos)) 610 } 611 } 612 } 613 if len(r.Values) == 0 { 614 return nil, nil 615 } 616 if p.chunked { 617 p.chunked = false 618 // Try to merge first value in r.Values into 619 // uncompleted row. 620 last := len(p.row.vals) - 1 621 if last < 0 { // sanity check 622 return nil, errChunkedEmptyRow() 623 } 624 var err error 625 // If p is chunked, then we should always try to merge p.last with r.first. 626 if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil { 627 return nil, err 628 } 629 r.Values = r.Values[1:] 630 // Merge is done, try to yield a complete Row. 631 if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil { 632 rows = append(rows, row) 633 } 634 } 635 for i, v := range r.Values { 636 // The rest values in r can be appened into p directly. 637 p.row.vals = append(p.row.vals, v) 638 // Again, check to see if a complete Row can be yielded because of 639 // the newly added value. 640 if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil { 641 rows = append(rows, row) 642 } 643 } 644 if r.ChunkedValue { 645 // After dealing with all values in r, if r is chunked then p must 646 // be also chunked. 647 p.chunked = true 648 } 649 return rows, nil 650} 651 652// isMergeable returns if a protobuf Value can be potentially merged with 653// other protobuf Values. 654func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool { 655 switch a.Kind.(type) { 656 case *proto3.Value_StringValue: 657 return true 658 case *proto3.Value_ListValue: 659 return true 660 default: 661 return false 662 } 663} 664 665// errIncompatibleMergeTypes returns error for incompatible protobuf types 666// that cannot be merged by partialResultSetDecoder. 667func errIncompatibleMergeTypes(a, b *proto3.Value) error { 668 return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind) 669} 670 671// errUnsupportedMergeType returns error for protobuf type that cannot be 672// merged to other protobufs. 673func errUnsupportedMergeType(a *proto3.Value) error { 674 return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind) 675} 676 677// merge tries to combine two protobuf Values if possible. 678func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) { 679 var err error 680 typeErr := errIncompatibleMergeTypes(a, b) 681 switch t := a.Kind.(type) { 682 case *proto3.Value_StringValue: 683 s, ok := b.Kind.(*proto3.Value_StringValue) 684 if !ok { 685 return nil, typeErr 686 } 687 return &proto3.Value{ 688 Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue}, 689 }, nil 690 case *proto3.Value_ListValue: 691 l, ok := b.Kind.(*proto3.Value_ListValue) 692 if !ok { 693 return nil, typeErr 694 } 695 if l.ListValue == nil || len(l.ListValue.Values) <= 0 { 696 // b is an empty list, just return a. 697 return a, nil 698 } 699 if t.ListValue == nil || len(t.ListValue.Values) <= 0 { 700 // a is an empty list, just return b. 701 return b, nil 702 } 703 if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) { 704 // When the last item in a is of type String, 705 // List or Struct(encoded into List by Cloud Spanner), 706 // try to Merge last item in a and first item in b. 707 t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0]) 708 if err != nil { 709 return nil, err 710 } 711 l.ListValue.Values = l.ListValue.Values[1:] 712 } 713 return &proto3.Value{ 714 Kind: &proto3.Value_ListValue{ 715 ListValue: &proto3.ListValue{ 716 Values: append(t.ListValue.Values, l.ListValue.Values...), 717 }, 718 }, 719 }, nil 720 default: 721 return nil, errUnsupportedMergeType(a) 722 } 723 724} 725 726// Done returns if partialResultSetDecoder has already done with all buffered 727// values. 728func (p *partialResultSetDecoder) done() bool { 729 // There is no explicit end of stream marker, but ending part way 730 // through a row is obviously bad, or ending with the last column still 731 // awaiting completion. 732 return len(p.row.vals) == 0 && !p.chunked 733} 734