1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"context"
22	"io"
23	"log"
24	"sync/atomic"
25	"time"
26
27	"cloud.google.com/go/internal/protostruct"
28	"cloud.google.com/go/internal/trace"
29	"github.com/golang/protobuf/proto"
30	proto3 "github.com/golang/protobuf/ptypes/struct"
31	"github.com/googleapis/gax-go/v2"
32	"google.golang.org/api/iterator"
33	sppb "google.golang.org/genproto/googleapis/spanner/v1"
34	"google.golang.org/grpc/codes"
35)
36
37// streamingReceiver is the interface for receiving data from a client side
38// stream.
39type streamingReceiver interface {
40	Recv() (*sppb.PartialResultSet, error)
41}
42
43// errEarlyReadEnd returns error for read finishes when gRPC stream is still
44// active.
45func errEarlyReadEnd() error {
46	return spannerErrorf(codes.FailedPrecondition, "read completed with active stream")
47}
48
49// stream is the internal fault tolerant method for streaming data from Cloud
50// Spanner.
51func stream(
52	ctx context.Context,
53	logger *log.Logger,
54	rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error),
55	setTimestamp func(time.Time),
56	release func(error),
57) *RowIterator {
58	return streamWithReplaceSessionFunc(
59		ctx,
60		logger,
61		rpc,
62		nil,
63		setTimestamp,
64		release,
65	)
66}
67
68// this stream method will automatically retry the stream on a new session if
69// the replaceSessionFunc function has been defined. This function should only be
70// used for single-use transactions.
71func streamWithReplaceSessionFunc(
72	ctx context.Context,
73	logger *log.Logger,
74	rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error),
75	replaceSession func(ctx context.Context) error,
76	setTimestamp func(time.Time),
77	release func(error),
78) *RowIterator {
79	ctx, cancel := context.WithCancel(ctx)
80	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator")
81	return &RowIterator{
82		streamd:      newResumableStreamDecoder(ctx, logger, rpc, replaceSession),
83		rowd:         &partialResultSetDecoder{},
84		setTimestamp: setTimestamp,
85		release:      release,
86		cancel:       cancel,
87	}
88}
89
90// RowIterator is an iterator over Rows.
91type RowIterator struct {
92	// The plan for the query. Available after RowIterator.Next returns
93	// iterator.Done if QueryWithStats was called.
94	QueryPlan *sppb.QueryPlan
95
96	// Execution statistics for the query. Available after RowIterator.Next
97	// returns iterator.Done if QueryWithStats was called.
98	QueryStats map[string]interface{}
99
100	// For a DML statement, the number of rows affected. For PDML, this is a
101	// lower bound. Available for DML statements after RowIterator.Next returns
102	// iterator.Done.
103	RowCount int64
104
105	streamd      *resumableStreamDecoder
106	rowd         *partialResultSetDecoder
107	setTimestamp func(time.Time)
108	release      func(error)
109	cancel       func()
110	err          error
111	rows         []*Row
112	sawStats     bool
113}
114
115// Next returns the next result. Its second return value is iterator.Done if
116// there are no more results. Once Next returns Done, all subsequent calls
117// will return Done.
118func (r *RowIterator) Next() (*Row, error) {
119	if r.err != nil {
120		return nil, r.err
121	}
122	for len(r.rows) == 0 && r.streamd.next() {
123		prs := r.streamd.get()
124		if prs.Stats != nil {
125			r.sawStats = true
126			r.QueryPlan = prs.Stats.QueryPlan
127			r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats)
128			if prs.Stats.RowCount != nil {
129				rc, err := extractRowCount(prs.Stats)
130				if err != nil {
131					return nil, err
132				}
133				r.RowCount = rc
134			}
135		}
136		r.rows, r.err = r.rowd.add(prs)
137		if r.err != nil {
138			return nil, r.err
139		}
140		if !r.rowd.ts.IsZero() && r.setTimestamp != nil {
141			r.setTimestamp(r.rowd.ts)
142			r.setTimestamp = nil
143		}
144	}
145	if len(r.rows) > 0 {
146		row := r.rows[0]
147		r.rows = r.rows[1:]
148		return row, nil
149	}
150	if err := r.streamd.lastErr(); err != nil {
151		r.err = toSpannerError(err)
152	} else if !r.rowd.done() {
153		r.err = errEarlyReadEnd()
154	} else {
155		r.err = iterator.Done
156	}
157	return nil, r.err
158}
159
160func extractRowCount(stats *sppb.ResultSetStats) (int64, error) {
161	if stats.RowCount == nil {
162		return 0, spannerErrorf(codes.Internal, "missing RowCount")
163	}
164	switch rc := stats.RowCount.(type) {
165	case *sppb.ResultSetStats_RowCountExact:
166		return rc.RowCountExact, nil
167	case *sppb.ResultSetStats_RowCountLowerBound:
168		return rc.RowCountLowerBound, nil
169	default:
170		return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount)
171	}
172}
173
174// Do calls the provided function once in sequence for each row in the
175// iteration. If the function returns a non-nil error, Do immediately returns
176// that error.
177//
178// If there are no rows in the iterator, Do will return nil without calling the
179// provided function.
180//
181// Do always calls Stop on the iterator.
182func (r *RowIterator) Do(f func(r *Row) error) error {
183	defer r.Stop()
184	for {
185		row, err := r.Next()
186		switch err {
187		case iterator.Done:
188			return nil
189		case nil:
190			if err = f(row); err != nil {
191				return err
192			}
193		default:
194			return err
195		}
196	}
197}
198
199// Stop terminates the iteration. It should be called after you finish using the
200// iterator.
201func (r *RowIterator) Stop() {
202	if r.streamd != nil {
203		defer trace.EndSpan(r.streamd.ctx, r.err)
204	}
205	if r.cancel != nil {
206		r.cancel()
207	}
208	if r.release != nil {
209		r.release(r.err)
210		if r.err == nil {
211			r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
212		}
213		r.release = nil
214
215	}
216}
217
218// partialResultQueue implements a simple FIFO queue.  The zero value is a valid
219// queue.
220type partialResultQueue struct {
221	q     []*sppb.PartialResultSet
222	first int
223	last  int
224	n     int // number of elements in queue
225}
226
227// empty returns if the partialResultQueue is empty.
228func (q *partialResultQueue) empty() bool {
229	return q.n == 0
230}
231
232// errEmptyQueue returns error for dequeuing an empty queue.
233func errEmptyQueue() error {
234	return spannerErrorf(codes.OutOfRange, "empty partialResultQueue")
235}
236
237// peekLast returns the last item in partialResultQueue; if the queue
238// is empty, it returns error.
239func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) {
240	if q.empty() {
241		return nil, errEmptyQueue()
242	}
243	return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil
244}
245
246// push adds an item to the tail of partialResultQueue.
247func (q *partialResultQueue) push(r *sppb.PartialResultSet) {
248	if q.q == nil {
249		q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */)
250	}
251	if q.n == cap(q.q) {
252		buf := make([]*sppb.PartialResultSet, cap(q.q)*2)
253		for i := 0; i < q.n; i++ {
254			buf[i] = q.q[(q.first+i)%cap(q.q)]
255		}
256		q.q = buf
257		q.first = 0
258		q.last = q.n
259	}
260	q.q[q.last] = r
261	q.last = (q.last + 1) % cap(q.q)
262	q.n++
263}
264
265// pop removes an item from the head of partialResultQueue and returns it.
266func (q *partialResultQueue) pop() *sppb.PartialResultSet {
267	if q.n == 0 {
268		return nil
269	}
270	r := q.q[q.first]
271	q.q[q.first] = nil
272	q.first = (q.first + 1) % cap(q.q)
273	q.n--
274	return r
275}
276
277// clear empties partialResultQueue.
278func (q *partialResultQueue) clear() {
279	*q = partialResultQueue{}
280}
281
282// dump retrieves all items from partialResultQueue and return them in a slice.
283// It is used only in tests.
284func (q *partialResultQueue) dump() []*sppb.PartialResultSet {
285	var dq []*sppb.PartialResultSet
286	for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) {
287		dq = append(dq, q.q[i])
288	}
289	return dq
290}
291
292// resumableStreamDecoderState encodes resumableStreamDecoder's status. See also
293// the comments for resumableStreamDecoder.Next.
294type resumableStreamDecoderState int
295
296const (
297	unConnected         resumableStreamDecoderState = iota // 0
298	queueingRetryable                                      // 1
299	queueingUnretryable                                    // 2
300	aborted                                                // 3
301	finished                                               // 4
302)
303
304// resumableStreamDecoder provides a resumable interface for receiving
305// sppb.PartialResultSet(s) from a given query wrapped by
306// resumableStreamDecoder.rpc().
307type resumableStreamDecoder struct {
308	// state is the current status of resumableStreamDecoder, see also
309	// the comments for resumableStreamDecoder.Next.
310	state resumableStreamDecoderState
311
312	// stateWitness when non-nil is called to observe state change,
313	// used for testing.
314	stateWitness func(resumableStreamDecoderState)
315
316	// ctx is the caller's context, used for cancel/timeout Next().
317	ctx context.Context
318
319	// rpc is a factory of streamingReceiver, which might resume
320	// a previous stream from the point encoded in restartToken.
321	// rpc is always a wrapper of a Cloud Spanner query which is
322	// resumable.
323	rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error)
324
325	// replaceSessionFunc is a function that can be used to replace the session
326	// that is being used to execute the read operation. This function should
327	// only be defined for single-use transactions that can safely retry the
328	// read operation on a new session. If this function is nil, the stream
329	// does not support retrying the query on a new session.
330	replaceSessionFunc func(ctx context.Context) error
331
332	// logger is the logger to use.
333	logger *log.Logger
334
335	// stream is the current RPC streaming receiver.
336	stream streamingReceiver
337
338	// q buffers received yet undecoded partial results.
339	q partialResultQueue
340
341	// bytesBetweenResumeTokens is the proxy of the byte size of
342	// PartialResultSets being queued between two resume tokens. Once
343	// bytesBetweenResumeTokens is greater than maxBytesBetweenResumeTokens,
344	// resumableStreamDecoder goes into queueingUnretryable state.
345	bytesBetweenResumeTokens int32
346
347	// maxBytesBetweenResumeTokens is the max number of bytes that can be
348	// buffered between two resume tokens. It is always copied from the global
349	// maxBytesBetweenResumeTokens atomically.
350	maxBytesBetweenResumeTokens int32
351
352	// np is the next sppb.PartialResultSet ready to be returned
353	// to caller of resumableStreamDecoder.Get().
354	np *sppb.PartialResultSet
355
356	// resumeToken stores the resume token that resumableStreamDecoder has
357	// last revealed to caller.
358	resumeToken []byte
359
360	// err is the last error resumableStreamDecoder has encountered so far.
361	err error
362
363	// backoff is used for the retry settings
364	backoff gax.Backoff
365}
366
367// newResumableStreamDecoder creates a new resumeableStreamDecoder instance.
368// Parameter rpc should be a function that creates a new stream beginning at the
369// restartToken if non-nil.
370func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder {
371	return &resumableStreamDecoder{
372		ctx:                         ctx,
373		logger:                      logger,
374		rpc:                         rpc,
375		replaceSessionFunc:          replaceSession,
376		maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens),
377		backoff:                     DefaultRetryBackoff,
378	}
379}
380
381// changeState fulfills state transition for resumableStateDecoder.
382func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) {
383	if d.state == queueingRetryable && d.state != target {
384		// Reset bytesBetweenResumeTokens because it is only meaningful/changed
385		// under queueingRetryable state.
386		d.bytesBetweenResumeTokens = 0
387	}
388	d.state = target
389	if d.stateWitness != nil {
390		d.stateWitness(target)
391	}
392}
393
394// isNewResumeToken returns if the observed resume token is different from
395// the one returned from server last time.
396func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool {
397	if rt == nil {
398		return false
399	}
400	if bytes.Equal(rt, d.resumeToken) {
401		return false
402	}
403	return true
404}
405
406// Next advances to the next available partial result set.  If error or no
407// more, returns false, call Err to determine if an error was encountered.
408// The following diagram illustrates the state machine of resumableStreamDecoder
409// that Next() implements. Note that state transition can be only triggered by
410// RPC activities.
411/*
412        rpc() fails retryable
413      +---------+
414      |         |    rpc() fails unretryable/ctx timeouts or cancelled
415      |         |   +------------------------------------------------+
416      |         |   |                                                |
417      |         v   |                                                v
418      |     +---+---+---+                       +--------+    +------+--+
419      +-----+unConnected|                       |finished|    | aborted |<----+
420            |           |                       ++-----+-+    +------+--+     |
421            +---+----+--+                        ^     ^             ^        |
422                |    ^                           |     |             |        |
423                |    |                           |     |     recv() fails     |
424                |    |                           |     |             |        |
425                |    |recv() fails retryable     |     |             |        |
426                |    |with valid ctx             |     |             |        |
427                |    |                           |     |             |        |
428      rpc() succeeds |   +-----------------------+     |             |        |
429                |    |   |         recv EOF         recv EOF         |        |
430                |    |   |                             |             |        |
431                v    |   |     Queue size exceeds      |             |        |
432            +---+----+---+----+threshold       +-------+-----------+ |        |
433+---------->+                 +--------------->+                   +-+        |
434|           |queueingRetryable|                |queueingUnretryable|          |
435|           |                 +<---------------+                   |          |
436|           +---+----------+--+ pop() returns  +--+----+-----------+          |
437|               |          |    resume token      |    ^                      |
438|               |          |                      |    |                      |
439|               |          |                      |    |                      |
440+---------------+          |                      |    |                      |
441   recv() succeeds         |                      +----+                      |
442                           |                      recv() succeeds             |
443                           |                                                  |
444                           |                                                  |
445                           |                                                  |
446                           |                                                  |
447                           |                                                  |
448                           +--------------------------------------------------+
449                                               recv() fails unretryable
450
451*/
452var (
453	// maxBytesBetweenResumeTokens is the maximum amount of bytes that
454	// resumableStreamDecoder in queueingRetryable state can use to queue
455	// PartialResultSets before getting into queueingUnretryable state.
456	maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024)
457)
458
459func (d *resumableStreamDecoder) next() bool {
460	retryer := onCodes(d.backoff, codes.Unavailable, codes.Internal)
461	for {
462		switch d.state {
463		case unConnected:
464			// If no gRPC stream is available, try to initiate one.
465			d.stream, d.err = d.rpc(d.ctx, d.resumeToken)
466			if d.err == nil {
467				d.changeState(queueingRetryable)
468				continue
469			}
470			delay, shouldRetry := retryer.Retry(d.err)
471			if !shouldRetry {
472				d.changeState(aborted)
473				continue
474			}
475			trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay)
476			if err := gax.Sleep(d.ctx, delay); err == nil {
477				// Be explicit about state transition, although the
478				// state doesn't actually change. State transition
479				// will be triggered only by RPC activity, regardless of
480				// whether there is an actual state change or not.
481				d.changeState(unConnected)
482			} else {
483				d.err = err
484				d.changeState(aborted)
485			}
486			continue
487
488		case queueingRetryable:
489			fallthrough
490		case queueingUnretryable:
491			// Receiving queue is not empty.
492			last, err := d.q.peekLast()
493			if err != nil {
494				// Only the case that receiving queue is empty could cause
495				// peekLast to return error and in such case, we should try to
496				// receive from stream.
497				d.tryRecv(retryer)
498				continue
499			}
500			if d.isNewResumeToken(last.ResumeToken) {
501				// Got new resume token, return buffered sppb.PartialResultSets
502				// to caller.
503				d.np = d.q.pop()
504				if d.q.empty() {
505					d.bytesBetweenResumeTokens = 0
506					// The new resume token was just popped out from queue,
507					// record it.
508					d.resumeToken = d.np.ResumeToken
509					d.changeState(queueingRetryable)
510				}
511				return true
512			}
513			if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable {
514				d.changeState(queueingUnretryable)
515				continue
516			}
517			if d.state == queueingUnretryable {
518				// When there is no resume token observed, only yield
519				// sppb.PartialResultSets to caller under queueingUnretryable
520				// state.
521				d.np = d.q.pop()
522				return true
523			}
524			// Needs to receive more from gRPC stream till a new resume token
525			// is observed.
526			d.tryRecv(retryer)
527			continue
528		case aborted:
529			// Discard all pending items because none of them should be yield
530			// to caller.
531			d.q.clear()
532			return false
533		case finished:
534			// If query has finished, check if there are still buffered messages.
535			if d.q.empty() {
536				// No buffered PartialResultSet.
537				return false
538			}
539			// Although query has finished, there are still buffered
540			// PartialResultSets.
541			d.np = d.q.pop()
542			return true
543
544		default:
545			logf(d.logger, "Unexpected resumableStreamDecoder.state: %v", d.state)
546			return false
547		}
548	}
549}
550
551// tryRecv attempts to receive a PartialResultSet from gRPC stream.
552func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
553	var res *sppb.PartialResultSet
554	res, d.err = d.stream.Recv()
555	if d.err == nil {
556		d.q.push(res)
557		if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) {
558			d.bytesBetweenResumeTokens += int32(proto.Size(res))
559		}
560		d.changeState(d.state)
561		return
562	}
563	if d.err == io.EOF {
564		d.err = nil
565		d.changeState(finished)
566		return
567	}
568	if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil {
569		// A 'Session not found' error occurred before we received a resume
570		// token and a replaceSessionFunc function is defined. Try to restart
571		// the stream on a new session.
572		if err := d.replaceSessionFunc(d.ctx); err != nil {
573			d.err = err
574			d.changeState(aborted)
575			return
576		}
577	} else {
578		delay, shouldRetry := retryer.Retry(d.err)
579		if !shouldRetry || d.state != queueingRetryable {
580			d.changeState(aborted)
581			return
582		}
583		if err := gax.Sleep(d.ctx, delay); err != nil {
584			d.err = err
585			d.changeState(aborted)
586			return
587		}
588	}
589	// Clear error and retry the stream.
590	d.err = nil
591	// Discard all queue items (none have resume tokens).
592	d.q.clear()
593	d.stream = nil
594	d.changeState(unConnected)
595}
596
597// get returns the most recent PartialResultSet generated by a call to next.
598func (d *resumableStreamDecoder) get() *sppb.PartialResultSet {
599	return d.np
600}
601
602// lastErr returns the last non-EOF error encountered.
603func (d *resumableStreamDecoder) lastErr() error {
604	return d.err
605}
606
607// partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner
608// Rows.
609type partialResultSetDecoder struct {
610	row     Row
611	tx      *sppb.Transaction
612	chunked bool // if true, next value should be merged with last values
613	// entry.
614	ts time.Time // read timestamp
615}
616
617// yield checks we have a complete row, and if so returns it.  A row is not
618// complete if it doesn't have enough columns, or if this is a chunked response
619// and there are no further values to process.
620func (p *partialResultSetDecoder) yield(chunked, last bool) *Row {
621	if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) {
622		// When partialResultSetDecoder gets enough number of Column values.
623		// There are two cases that a new Row should be yield:
624		//
625		//   1. The incoming PartialResultSet is not chunked;
626		//   2. The incoming PartialResultSet is chunked, but the
627		//      proto3.Value being merged is not the last one in
628		//      the PartialResultSet.
629		//
630		// Use a fresh Row to simplify clients that want to use yielded results
631		// after the next row is retrieved. Note that fields is never changed
632		// so it doesn't need to be copied.
633		fresh := Row{
634			fields: p.row.fields,
635			vals:   make([]*proto3.Value, len(p.row.vals)),
636		}
637		copy(fresh.vals, p.row.vals)
638		p.row.vals = p.row.vals[:0] // empty and reuse slice
639		return &fresh
640	}
641	return nil
642}
643
644// yieldTx returns transaction information via caller supplied callback.
645func errChunkedEmptyRow() error {
646	return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row")
647}
648
649// add tries to merge a new PartialResultSet into buffered Row. It returns any
650// rows that have been completed as a result.
651func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) {
652	var rows []*Row
653	if r.Metadata != nil {
654		// Metadata should only be returned in the first result.
655		if p.row.fields == nil {
656			p.row.fields = r.Metadata.RowType.Fields
657		}
658		if p.tx == nil && r.Metadata.Transaction != nil {
659			p.tx = r.Metadata.Transaction
660			if p.tx.ReadTimestamp != nil {
661				p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos))
662			}
663		}
664	}
665	if len(r.Values) == 0 {
666		return nil, nil
667	}
668	if p.chunked {
669		p.chunked = false
670		// Try to merge first value in r.Values into uncompleted row.
671		last := len(p.row.vals) - 1
672		if last < 0 { // sanity check
673			return nil, errChunkedEmptyRow()
674		}
675		var err error
676		// If p is chunked, then we should always try to merge p.last with
677		// r.first.
678		if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
679			return nil, err
680		}
681		r.Values = r.Values[1:]
682		// Merge is done, try to yield a complete Row.
683		if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil {
684			rows = append(rows, row)
685		}
686	}
687	for i, v := range r.Values {
688		// The rest values in r can be appened into p directly.
689		p.row.vals = append(p.row.vals, v)
690		// Again, check to see if a complete Row can be yielded because of the
691		// newly added value.
692		if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil {
693			rows = append(rows, row)
694		}
695	}
696	if r.ChunkedValue {
697		// After dealing with all values in r, if r is chunked then p must be
698		// also chunked.
699		p.chunked = true
700	}
701	return rows, nil
702}
703
704// isMergeable returns if a protobuf Value can be potentially merged with other
705// protobuf Values.
706func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool {
707	switch a.Kind.(type) {
708	case *proto3.Value_StringValue:
709		return true
710	case *proto3.Value_ListValue:
711		return true
712	default:
713		return false
714	}
715}
716
717// errIncompatibleMergeTypes returns error for incompatible protobuf types that
718// cannot be merged by partialResultSetDecoder.
719func errIncompatibleMergeTypes(a, b *proto3.Value) error {
720	return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind)
721}
722
723// errUnsupportedMergeType returns error for protobuf type that cannot be merged
724// to other protobufs.
725func errUnsupportedMergeType(a *proto3.Value) error {
726	return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind)
727}
728
729// merge tries to combine two protobuf Values if possible.
730func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) {
731	var err error
732	typeErr := errIncompatibleMergeTypes(a, b)
733	switch t := a.Kind.(type) {
734	case *proto3.Value_StringValue:
735		s, ok := b.Kind.(*proto3.Value_StringValue)
736		if !ok {
737			return nil, typeErr
738		}
739		return &proto3.Value{
740			Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue},
741		}, nil
742	case *proto3.Value_ListValue:
743		l, ok := b.Kind.(*proto3.Value_ListValue)
744		if !ok {
745			return nil, typeErr
746		}
747		if l.ListValue == nil || len(l.ListValue.Values) <= 0 {
748			// b is an empty list, just return a.
749			return a, nil
750		}
751		if t.ListValue == nil || len(t.ListValue.Values) <= 0 {
752			// a is an empty list, just return b.
753			return b, nil
754		}
755		if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) {
756			// When the last item in a is of type String, List or Struct
757			// (encoded into List by Cloud Spanner), try to Merge last item in
758			// a and first item in b.
759			t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0])
760			if err != nil {
761				return nil, err
762			}
763			l.ListValue.Values = l.ListValue.Values[1:]
764		}
765		return &proto3.Value{
766			Kind: &proto3.Value_ListValue{
767				ListValue: &proto3.ListValue{
768					Values: append(t.ListValue.Values, l.ListValue.Values...),
769				},
770			},
771		}, nil
772	default:
773		return nil, errUnsupportedMergeType(a)
774	}
775
776}
777
778// Done returns if partialResultSetDecoder has already done with all buffered
779// values.
780func (p *partialResultSetDecoder) done() bool {
781	// There is no explicit end of stream marker, but ending part way through a
782	// row is obviously bad, or ending with the last column still awaiting
783	// completion.
784	return len(p.row.vals) == 0 && !p.chunked
785}
786