1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"context"
22	"io"
23	"log"
24	"sync/atomic"
25	"time"
26
27	"cloud.google.com/go/internal/protostruct"
28	"cloud.google.com/go/internal/trace"
29	"cloud.google.com/go/spanner/internal/backoff"
30	proto "github.com/golang/protobuf/proto"
31	proto3 "github.com/golang/protobuf/ptypes/struct"
32	"google.golang.org/api/iterator"
33	sppb "google.golang.org/genproto/googleapis/spanner/v1"
34	"google.golang.org/grpc/codes"
35)
36
37// streamingReceiver is the interface for receiving data from a client side
38// stream.
39type streamingReceiver interface {
40	Recv() (*sppb.PartialResultSet, error)
41}
42
43// errEarlyReadEnd returns error for read finishes when gRPC stream is still active.
44func errEarlyReadEnd() error {
45	return spannerErrorf(codes.FailedPrecondition, "read completed with active stream")
46}
47
48// stream is the internal fault tolerant method for streaming data from
49// Cloud Spanner.
50func stream(ctx context.Context, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error)) *RowIterator {
51	ctx, cancel := context.WithCancel(ctx)
52	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator")
53	return &RowIterator{
54		streamd:      newResumableStreamDecoder(ctx, rpc),
55		rowd:         &partialResultSetDecoder{},
56		setTimestamp: setTimestamp,
57		release:      release,
58		cancel:       cancel,
59	}
60}
61
62// RowIterator is an iterator over Rows.
63type RowIterator struct {
64	// The plan for the query. Available after RowIterator.Next returns iterator.Done
65	// if QueryWithStats was called.
66	QueryPlan *sppb.QueryPlan
67
68	// Execution statistics for the query. Available after RowIterator.Next returns iterator.Done
69	// if QueryWithStats was called.
70	QueryStats map[string]interface{}
71
72	// For a DML statement, the number of rows affected. For PDML, this is a lower bound.
73	// Available for DML statements after RowIterator.Next returns iterator.Done.
74	RowCount int64
75
76	streamd      *resumableStreamDecoder
77	rowd         *partialResultSetDecoder
78	setTimestamp func(time.Time)
79	release      func(error)
80	cancel       func()
81	err          error
82	rows         []*Row
83	sawStats     bool
84}
85
86// Next returns the next result. Its second return value is iterator.Done if
87// there are no more results. Once Next returns Done, all subsequent calls
88// will return Done.
89func (r *RowIterator) Next() (*Row, error) {
90	if r.err != nil {
91		return nil, r.err
92	}
93	for len(r.rows) == 0 && r.streamd.next() {
94		prs := r.streamd.get()
95		if prs.Stats != nil {
96			r.sawStats = true
97			r.QueryPlan = prs.Stats.QueryPlan
98			r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats)
99			if prs.Stats.RowCount != nil {
100				rc, err := extractRowCount(prs.Stats)
101				if err != nil {
102					return nil, err
103				}
104				r.RowCount = rc
105			}
106		}
107		r.rows, r.err = r.rowd.add(prs)
108		if r.err != nil {
109			return nil, r.err
110		}
111		if !r.rowd.ts.IsZero() && r.setTimestamp != nil {
112			r.setTimestamp(r.rowd.ts)
113			r.setTimestamp = nil
114		}
115	}
116	if len(r.rows) > 0 {
117		row := r.rows[0]
118		r.rows = r.rows[1:]
119		return row, nil
120	}
121	if err := r.streamd.lastErr(); err != nil {
122		r.err = toSpannerError(err)
123	} else if !r.rowd.done() {
124		r.err = errEarlyReadEnd()
125	} else {
126		r.err = iterator.Done
127	}
128	return nil, r.err
129}
130
131func extractRowCount(stats *sppb.ResultSetStats) (int64, error) {
132	if stats.RowCount == nil {
133		return 0, spannerErrorf(codes.Internal, "missing RowCount")
134	}
135	switch rc := stats.RowCount.(type) {
136	case *sppb.ResultSetStats_RowCountExact:
137		return rc.RowCountExact, nil
138	case *sppb.ResultSetStats_RowCountLowerBound:
139		return rc.RowCountLowerBound, nil
140	default:
141		return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount)
142	}
143}
144
145// Do calls the provided function once in sequence for each row in the iteration.  If the
146// function returns a non-nil error, Do immediately returns that error.
147//
148// If there are no rows in the iterator, Do will return nil without calling the
149// provided function.
150//
151// Do always calls Stop on the iterator.
152func (r *RowIterator) Do(f func(r *Row) error) error {
153	defer r.Stop()
154	for {
155		row, err := r.Next()
156		switch err {
157		case iterator.Done:
158			return nil
159		case nil:
160			if err = f(row); err != nil {
161				return err
162			}
163		default:
164			return err
165		}
166	}
167}
168
169// Stop terminates the iteration. It should be called after you finish using the iterator.
170func (r *RowIterator) Stop() {
171	if r.streamd != nil {
172		defer trace.EndSpan(r.streamd.ctx, r.err)
173	}
174	if r.cancel != nil {
175		r.cancel()
176	}
177	if r.release != nil {
178		r.release(r.err)
179		if r.err == nil {
180			r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
181		}
182		r.release = nil
183
184	}
185}
186
187// partialResultQueue implements a simple FIFO queue.  The zero value is a
188// valid queue.
189type partialResultQueue struct {
190	q     []*sppb.PartialResultSet
191	first int
192	last  int
193	n     int // number of elements in queue
194}
195
196// empty returns if the partialResultQueue is empty.
197func (q *partialResultQueue) empty() bool {
198	return q.n == 0
199}
200
201// errEmptyQueue returns error for dequeuing an empty queue.
202func errEmptyQueue() error {
203	return spannerErrorf(codes.OutOfRange, "empty partialResultQueue")
204}
205
206// peekLast returns the last item in partialResultQueue; if the queue
207// is empty, it returns error.
208func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) {
209	if q.empty() {
210		return nil, errEmptyQueue()
211	}
212	return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil
213}
214
215// push adds an item to the tail of partialResultQueue.
216func (q *partialResultQueue) push(r *sppb.PartialResultSet) {
217	if q.q == nil {
218		q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */)
219	}
220	if q.n == cap(q.q) {
221		buf := make([]*sppb.PartialResultSet, cap(q.q)*2)
222		for i := 0; i < q.n; i++ {
223			buf[i] = q.q[(q.first+i)%cap(q.q)]
224		}
225		q.q = buf
226		q.first = 0
227		q.last = q.n
228	}
229	q.q[q.last] = r
230	q.last = (q.last + 1) % cap(q.q)
231	q.n++
232}
233
234// pop removes an item from the head of partialResultQueue and returns
235// it.
236func (q *partialResultQueue) pop() *sppb.PartialResultSet {
237	if q.n == 0 {
238		return nil
239	}
240	r := q.q[q.first]
241	q.q[q.first] = nil
242	q.first = (q.first + 1) % cap(q.q)
243	q.n--
244	return r
245}
246
247// clear empties partialResultQueue.
248func (q *partialResultQueue) clear() {
249	*q = partialResultQueue{}
250}
251
252// dump retrieves all items from partialResultQueue and return them in a slice.
253// It is used only in tests.
254func (q *partialResultQueue) dump() []*sppb.PartialResultSet {
255	var dq []*sppb.PartialResultSet
256	for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) {
257		dq = append(dq, q.q[i])
258	}
259	return dq
260}
261
262// resumableStreamDecoderState encodes resumableStreamDecoder's status.
263// See also the comments for resumableStreamDecoder.Next.
264type resumableStreamDecoderState int
265
266const (
267	unConnected         resumableStreamDecoderState = iota // 0
268	queueingRetryable                                      // 1
269	queueingUnretryable                                    // 2
270	aborted                                                // 3
271	finished                                               // 4
272)
273
274// resumableStreamDecoder provides a resumable interface for receiving
275// sppb.PartialResultSet(s) from a given query wrapped by
276// resumableStreamDecoder.rpc().
277type resumableStreamDecoder struct {
278	// state is the current status of resumableStreamDecoder, see also
279	// the comments for resumableStreamDecoder.Next.
280	state resumableStreamDecoderState
281	// stateWitness when non-nil is called to observe state change,
282	// used for testing.
283	stateWitness func(resumableStreamDecoderState)
284	// ctx is the caller's context, used for cancel/timeout Next().
285	ctx context.Context
286	// rpc is a factory of streamingReceiver, which might resume
287	// a previous stream from the point encoded in restartToken.
288	// rpc is always a wrapper of a Cloud Spanner query which is
289	// resumable.
290	rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error)
291	// stream is the current RPC streaming receiver.
292	stream streamingReceiver
293	// q buffers received yet undecoded partial results.
294	q partialResultQueue
295	// bytesBetweenResumeTokens is the proxy of the byte size of PartialResultSets being queued
296	// between two resume tokens. Once bytesBetweenResumeTokens is greater than
297	// maxBytesBetweenResumeTokens, resumableStreamDecoder goes into queueingUnretryable state.
298	bytesBetweenResumeTokens int32
299	// maxBytesBetweenResumeTokens is the max number of bytes that can be buffered
300	// between two resume tokens. It is always copied from the global maxBytesBetweenResumeTokens
301	// atomically.
302	maxBytesBetweenResumeTokens int32
303	// np is the next sppb.PartialResultSet ready to be returned
304	// to caller of resumableStreamDecoder.Get().
305	np *sppb.PartialResultSet
306	// resumeToken stores the resume token that resumableStreamDecoder has
307	// last revealed to caller.
308	resumeToken []byte
309	// retryCount is the number of retries that have been carried out so far
310	retryCount int
311	// err is the last error resumableStreamDecoder has encountered so far.
312	err error
313	// backoff to compute delays between retries.
314	backoff backoff.ExponentialBackoff
315}
316
317// newResumableStreamDecoder creates a new resumeableStreamDecoder instance.
318// Parameter rpc should be a function that creates a new stream
319// beginning at the restartToken if non-nil.
320func newResumableStreamDecoder(ctx context.Context, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error)) *resumableStreamDecoder {
321	return &resumableStreamDecoder{
322		ctx:                         ctx,
323		rpc:                         rpc,
324		maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens),
325		backoff:                     backoff.DefaultBackoff,
326	}
327}
328
329// changeState fulfills state transition for resumableStateDecoder.
330func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) {
331	if d.state == queueingRetryable && d.state != target {
332		// Reset bytesBetweenResumeTokens because it is only meaningful/changed under
333		// queueingRetryable state.
334		d.bytesBetweenResumeTokens = 0
335	}
336	d.state = target
337	if d.stateWitness != nil {
338		d.stateWitness(target)
339	}
340}
341
342// isNewResumeToken returns if the observed resume token is different from
343// the one returned from server last time.
344func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool {
345	if rt == nil {
346		return false
347	}
348	if bytes.Equal(rt, d.resumeToken) {
349		return false
350	}
351	return true
352}
353
354// Next advances to the next available partial result set.  If error or no
355// more, returns false, call Err to determine if an error was encountered.
356// The following diagram illustrates the state machine of resumableStreamDecoder
357// that Next() implements. Note that state transition can be only triggered by
358// RPC activities.
359/*
360        rpc() fails retryable
361      +---------+
362      |         |    rpc() fails unretryable/ctx timeouts or cancelled
363      |         |   +------------------------------------------------+
364      |         |   |                                                |
365      |         v   |                                                v
366      |     +---+---+---+                       +--------+    +------+--+
367      +-----+unConnected|                       |finished|    | aborted |<----+
368            |           |                       ++-----+-+    +------+--+     |
369            +---+----+--+                        ^     ^             ^        |
370                |    ^                           |     |             |        |
371                |    |                           |     |     recv() fails     |
372                |    |                           |     |             |        |
373                |    |recv() fails retryable     |     |             |        |
374                |    |with valid ctx             |     |             |        |
375                |    |                           |     |             |        |
376      rpc() succeeds |   +-----------------------+     |             |        |
377                |    |   |         recv EOF         recv EOF         |        |
378                |    |   |                             |             |        |
379                v    |   |     Queue size exceeds      |             |        |
380            +---+----+---+----+threshold       +-------+-----------+ |        |
381+---------->+                 +--------------->+                   +-+        |
382|           |queueingRetryable|                |queueingUnretryable|          |
383|           |                 +<---------------+                   |          |
384|           +---+----------+--+ pop() returns  +--+----+-----------+          |
385|               |          |    resume token      |    ^                      |
386|               |          |                      |    |                      |
387|               |          |                      |    |                      |
388+---------------+          |                      |    |                      |
389   recv() succeeds         |                      +----+                      |
390                           |                      recv() succeeds             |
391                           |                                                  |
392                           |                                                  |
393                           |                                                  |
394                           |                                                  |
395                           |                                                  |
396                           +--------------------------------------------------+
397                                               recv() fails unretryable
398
399*/
400var (
401	// maxBytesBetweenResumeTokens is the maximum amount of bytes that resumableStreamDecoder
402	// in queueingRetryable state can use to queue PartialResultSets before getting
403	// into queueingUnretryable state.
404	maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024)
405)
406
407func (d *resumableStreamDecoder) next() bool {
408	for {
409		select {
410		case <-d.ctx.Done():
411			// Do context check here so that even gRPC failed to do
412			// so, resumableStreamDecoder can still break the loop
413			// as expected.
414			d.err = errContextCanceled(d.ctx, d.err)
415			d.changeState(aborted)
416		default:
417		}
418		switch d.state {
419		case unConnected:
420			// If no gRPC stream is available, try to initiate one.
421			if d.stream, d.err = d.rpc(d.ctx, d.resumeToken); d.err != nil {
422				if isRetryable(d.err) {
423					d.doBackOff()
424					// Be explicit about state transition, although the
425					// state doesn't actually change. State transition
426					// will be triggered only by RPC activity, regardless of
427					// whether there is an actual state change or not.
428					d.changeState(unConnected)
429					continue
430				}
431				d.changeState(aborted)
432				continue
433			}
434			d.resetBackOff()
435			d.changeState(queueingRetryable)
436			continue
437		case queueingRetryable:
438			fallthrough
439		case queueingUnretryable:
440			// Receiving queue is not empty.
441			last, err := d.q.peekLast()
442			if err != nil {
443				// Only the case that receiving queue is empty could cause peekLast to
444				// return error and in such case, we should try to receive from stream.
445				d.tryRecv()
446				continue
447			}
448			if d.isNewResumeToken(last.ResumeToken) {
449				// Got new resume token, return buffered sppb.PartialResultSets to caller.
450				d.np = d.q.pop()
451				if d.q.empty() {
452					d.bytesBetweenResumeTokens = 0
453					// The new resume token was just popped out from queue, record it.
454					d.resumeToken = d.np.ResumeToken
455					d.changeState(queueingRetryable)
456				}
457				return true
458			}
459			if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable {
460				d.changeState(queueingUnretryable)
461				continue
462			}
463			if d.state == queueingUnretryable {
464				// When there is no resume token observed,
465				// only yield sppb.PartialResultSets to caller under
466				// queueingUnretryable state.
467				d.np = d.q.pop()
468				return true
469			}
470			// Needs to receive more from gRPC stream till a new resume token
471			// is observed.
472			d.tryRecv()
473			continue
474		case aborted:
475			// Discard all pending items because none of them
476			// should be yield to caller.
477			d.q.clear()
478			return false
479		case finished:
480			// If query has finished, check if there are still buffered messages.
481			if d.q.empty() {
482				// No buffered PartialResultSet.
483				return false
484			}
485			// Although query has finished, there are still buffered PartialResultSets.
486			d.np = d.q.pop()
487			return true
488
489		default:
490			log.Printf("Unexpected resumableStreamDecoder.state: %v", d.state)
491			return false
492		}
493	}
494}
495
496// tryRecv attempts to receive a PartialResultSet from gRPC stream.
497func (d *resumableStreamDecoder) tryRecv() {
498	var res *sppb.PartialResultSet
499	if res, d.err = d.stream.Recv(); d.err != nil {
500		if d.err == io.EOF {
501			d.err = nil
502			d.changeState(finished)
503			return
504		}
505		if isRetryable(d.err) && d.state == queueingRetryable {
506			d.err = nil
507			// Discard all queue items (none have resume tokens).
508			d.q.clear()
509			d.stream = nil
510			d.changeState(unConnected)
511			d.doBackOff()
512			return
513		}
514		d.changeState(aborted)
515		return
516	}
517	d.q.push(res)
518	if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) {
519		// adjusting d.bytesBetweenResumeTokens
520		d.bytesBetweenResumeTokens += int32(proto.Size(res))
521	}
522	d.resetBackOff()
523	d.changeState(d.state)
524}
525
526// resetBackOff clears the internal retry counter of
527// resumableStreamDecoder so that the next exponential
528// backoff will start at a fresh state.
529func (d *resumableStreamDecoder) resetBackOff() {
530	d.retryCount = 0
531}
532
533// doBackoff does an exponential backoff sleep.
534func (d *resumableStreamDecoder) doBackOff() {
535	delay := d.backoff.Delay(d.retryCount)
536	trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay)
537	ticker := time.NewTicker(delay)
538	defer ticker.Stop()
539	d.retryCount++
540	select {
541	case <-d.ctx.Done():
542	case <-ticker.C:
543	}
544}
545
546// get returns the most recent PartialResultSet generated by a call to next.
547func (d *resumableStreamDecoder) get() *sppb.PartialResultSet {
548	return d.np
549}
550
551// lastErr returns the last non-EOF error encountered.
552func (d *resumableStreamDecoder) lastErr() error {
553	return d.err
554}
555
556// partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner
557// Rows.
558type partialResultSetDecoder struct {
559	row     Row
560	tx      *sppb.Transaction
561	chunked bool      // if true, next value should be merged with last values entry.
562	ts      time.Time // read timestamp
563}
564
565// yield checks we have a complete row, and if so returns it.  A row is not
566// complete if it doesn't have enough columns, or if this is a chunked response
567// and there are no further values to process.
568func (p *partialResultSetDecoder) yield(chunked, last bool) *Row {
569	if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) {
570		// When partialResultSetDecoder gets enough number of
571		// Column values, There are two cases that a new Row
572		// should be yield:
573		//   1. The incoming PartialResultSet is not chunked;
574		//   2. The incoming PartialResultSet is chunked, but the
575		//      proto3.Value being merged is not the last one in
576		//      the PartialResultSet.
577		//
578		// Use a fresh Row to simplify clients that want to use yielded results
579		// after the next row is retrieved. Note that fields is never changed
580		// so it doesn't need to be copied.
581		fresh := Row{
582			fields: p.row.fields,
583			vals:   make([]*proto3.Value, len(p.row.vals)),
584		}
585		copy(fresh.vals, p.row.vals)
586		p.row.vals = p.row.vals[:0] // empty and reuse slice
587		return &fresh
588	}
589	return nil
590}
591
592// yieldTx returns transaction information via caller supplied callback.
593func errChunkedEmptyRow() error {
594	return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row")
595}
596
597// add tries to merge a new PartialResultSet into buffered Row. It returns
598// any rows that have been completed as a result.
599func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) {
600	var rows []*Row
601	if r.Metadata != nil {
602		// Metadata should only be returned in the first result.
603		if p.row.fields == nil {
604			p.row.fields = r.Metadata.RowType.Fields
605		}
606		if p.tx == nil && r.Metadata.Transaction != nil {
607			p.tx = r.Metadata.Transaction
608			if p.tx.ReadTimestamp != nil {
609				p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos))
610			}
611		}
612	}
613	if len(r.Values) == 0 {
614		return nil, nil
615	}
616	if p.chunked {
617		p.chunked = false
618		// Try to merge first value in r.Values into
619		// uncompleted row.
620		last := len(p.row.vals) - 1
621		if last < 0 { // sanity check
622			return nil, errChunkedEmptyRow()
623		}
624		var err error
625		// If p is chunked, then we should always try to merge p.last with r.first.
626		if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
627			return nil, err
628		}
629		r.Values = r.Values[1:]
630		// Merge is done, try to yield a complete Row.
631		if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil {
632			rows = append(rows, row)
633		}
634	}
635	for i, v := range r.Values {
636		// The rest values in r can be appened into p directly.
637		p.row.vals = append(p.row.vals, v)
638		// Again, check to see if a complete Row can be yielded because of
639		// the newly added value.
640		if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil {
641			rows = append(rows, row)
642		}
643	}
644	if r.ChunkedValue {
645		// After dealing with all values in r, if r is chunked then p must
646		// be also chunked.
647		p.chunked = true
648	}
649	return rows, nil
650}
651
652// isMergeable returns if a protobuf Value can be potentially merged with
653// other protobuf Values.
654func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool {
655	switch a.Kind.(type) {
656	case *proto3.Value_StringValue:
657		return true
658	case *proto3.Value_ListValue:
659		return true
660	default:
661		return false
662	}
663}
664
665// errIncompatibleMergeTypes returns error for incompatible protobuf types
666// that cannot be merged by partialResultSetDecoder.
667func errIncompatibleMergeTypes(a, b *proto3.Value) error {
668	return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind)
669}
670
671// errUnsupportedMergeType returns error for protobuf type that cannot be
672// merged to other protobufs.
673func errUnsupportedMergeType(a *proto3.Value) error {
674	return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind)
675}
676
677// merge tries to combine two protobuf Values if possible.
678func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) {
679	var err error
680	typeErr := errIncompatibleMergeTypes(a, b)
681	switch t := a.Kind.(type) {
682	case *proto3.Value_StringValue:
683		s, ok := b.Kind.(*proto3.Value_StringValue)
684		if !ok {
685			return nil, typeErr
686		}
687		return &proto3.Value{
688			Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue},
689		}, nil
690	case *proto3.Value_ListValue:
691		l, ok := b.Kind.(*proto3.Value_ListValue)
692		if !ok {
693			return nil, typeErr
694		}
695		if l.ListValue == nil || len(l.ListValue.Values) <= 0 {
696			// b is an empty list, just return a.
697			return a, nil
698		}
699		if t.ListValue == nil || len(t.ListValue.Values) <= 0 {
700			// a is an empty list, just return b.
701			return b, nil
702		}
703		if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) {
704			// When the last item in a is of type String,
705			// List or Struct(encoded into List by Cloud Spanner),
706			// try to Merge last item in a and first item in b.
707			t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0])
708			if err != nil {
709				return nil, err
710			}
711			l.ListValue.Values = l.ListValue.Values[1:]
712		}
713		return &proto3.Value{
714			Kind: &proto3.Value_ListValue{
715				ListValue: &proto3.ListValue{
716					Values: append(t.ListValue.Values, l.ListValue.Values...),
717				},
718			},
719		}, nil
720	default:
721		return nil, errUnsupportedMergeType(a)
722	}
723
724}
725
726// Done returns if partialResultSetDecoder has already done with all buffered
727// values.
728func (p *partialResultSetDecoder) done() bool {
729	// There is no explicit end of stream marker, but ending part way
730	// through a row is obviously bad, or ending with the last column still
731	// awaiting completion.
732	return len(p.row.vals) == 0 && !p.chunked
733}
734