1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"bytes"
21	"context"
22	"io"
23	"log"
24	"sync/atomic"
25	"time"
26
27	"cloud.google.com/go/internal/protostruct"
28	"cloud.google.com/go/internal/trace"
29	"github.com/golang/protobuf/proto"
30	proto3 "github.com/golang/protobuf/ptypes/struct"
31	"github.com/googleapis/gax-go/v2"
32	"google.golang.org/api/iterator"
33	sppb "google.golang.org/genproto/googleapis/spanner/v1"
34	"google.golang.org/grpc/codes"
35)
36
37// streamingReceiver is the interface for receiving data from a client side
38// stream.
39type streamingReceiver interface {
40	Recv() (*sppb.PartialResultSet, error)
41}
42
43// errEarlyReadEnd returns error for read finishes when gRPC stream is still
44// active.
45func errEarlyReadEnd() error {
46	return spannerErrorf(codes.FailedPrecondition, "read completed with active stream")
47}
48
49// stream is the internal fault tolerant method for streaming data from Cloud
50// Spanner.
51func stream(
52	ctx context.Context,
53	logger *log.Logger,
54	rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error),
55	setTimestamp func(time.Time),
56	release func(error),
57) *RowIterator {
58	return streamWithReplaceSessionFunc(
59		ctx,
60		logger,
61		rpc,
62		nil,
63		setTimestamp,
64		release,
65	)
66}
67
68// this stream method will automatically retry the stream on a new session if
69// the replaceSessionFunc function has been defined. This function should only be
70// used for single-use transactions.
71func streamWithReplaceSessionFunc(
72	ctx context.Context,
73	logger *log.Logger,
74	rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error),
75	replaceSession func(ctx context.Context) error,
76	setTimestamp func(time.Time),
77	release func(error),
78) *RowIterator {
79	ctx, cancel := context.WithCancel(ctx)
80	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator")
81	return &RowIterator{
82		streamd:      newResumableStreamDecoder(ctx, logger, rpc, replaceSession),
83		rowd:         &partialResultSetDecoder{},
84		setTimestamp: setTimestamp,
85		release:      release,
86		cancel:       cancel,
87	}
88}
89
90// RowIterator is an iterator over Rows.
91type RowIterator struct {
92	// The plan for the query. Available after RowIterator.Next returns
93	// iterator.Done if QueryWithStats was called.
94	QueryPlan *sppb.QueryPlan
95
96	// Execution statistics for the query. Available after RowIterator.Next
97	// returns iterator.Done if QueryWithStats was called.
98	QueryStats map[string]interface{}
99
100	// For a DML statement, the number of rows affected. For PDML, this is a
101	// lower bound. Available for DML statements after RowIterator.Next returns
102	// iterator.Done.
103	RowCount int64
104
105	// The metadata of the results of the query. The metadata are available
106	// after the first call to RowIterator.Next(), unless the first call to
107	// RowIterator.Next() returned an error that is not equal to iterator.Done.
108	Metadata *sppb.ResultSetMetadata
109
110	streamd      *resumableStreamDecoder
111	rowd         *partialResultSetDecoder
112	setTimestamp func(time.Time)
113	release      func(error)
114	cancel       func()
115	err          error
116	rows         []*Row
117	sawStats     bool
118}
119
120// Next returns the next result. Its second return value is iterator.Done if
121// there are no more results. Once Next returns Done, all subsequent calls
122// will return Done.
123func (r *RowIterator) Next() (*Row, error) {
124	if r.err != nil {
125		return nil, r.err
126	}
127	for len(r.rows) == 0 && r.streamd.next() {
128		prs := r.streamd.get()
129		if prs.Stats != nil {
130			r.sawStats = true
131			r.QueryPlan = prs.Stats.QueryPlan
132			r.QueryStats = protostruct.DecodeToMap(prs.Stats.QueryStats)
133			if prs.Stats.RowCount != nil {
134				rc, err := extractRowCount(prs.Stats)
135				if err != nil {
136					return nil, err
137				}
138				r.RowCount = rc
139			}
140		}
141		var metadata *sppb.ResultSetMetadata
142		r.rows, metadata, r.err = r.rowd.add(prs)
143		if metadata != nil {
144			r.Metadata = metadata
145		}
146		if r.err != nil {
147			return nil, r.err
148		}
149		if !r.rowd.ts.IsZero() && r.setTimestamp != nil {
150			r.setTimestamp(r.rowd.ts)
151			r.setTimestamp = nil
152		}
153	}
154	if len(r.rows) > 0 {
155		row := r.rows[0]
156		r.rows = r.rows[1:]
157		return row, nil
158	}
159	if err := r.streamd.lastErr(); err != nil {
160		r.err = ToSpannerError(err)
161	} else if !r.rowd.done() {
162		r.err = errEarlyReadEnd()
163	} else {
164		r.err = iterator.Done
165	}
166	return nil, r.err
167}
168
169func extractRowCount(stats *sppb.ResultSetStats) (int64, error) {
170	if stats.RowCount == nil {
171		return 0, spannerErrorf(codes.Internal, "missing RowCount")
172	}
173	switch rc := stats.RowCount.(type) {
174	case *sppb.ResultSetStats_RowCountExact:
175		return rc.RowCountExact, nil
176	case *sppb.ResultSetStats_RowCountLowerBound:
177		return rc.RowCountLowerBound, nil
178	default:
179		return 0, spannerErrorf(codes.Internal, "unknown RowCount type %T", stats.RowCount)
180	}
181}
182
183// Do calls the provided function once in sequence for each row in the
184// iteration. If the function returns a non-nil error, Do immediately returns
185// that error.
186//
187// If there are no rows in the iterator, Do will return nil without calling the
188// provided function.
189//
190// Do always calls Stop on the iterator.
191func (r *RowIterator) Do(f func(r *Row) error) error {
192	defer r.Stop()
193	for {
194		row, err := r.Next()
195		switch err {
196		case iterator.Done:
197			return nil
198		case nil:
199			if err = f(row); err != nil {
200				return err
201			}
202		default:
203			return err
204		}
205	}
206}
207
208// Stop terminates the iteration. It should be called after you finish using the
209// iterator.
210func (r *RowIterator) Stop() {
211	if r.streamd != nil {
212		if r.err != nil && r.err != iterator.Done {
213			defer trace.EndSpan(r.streamd.ctx, r.err)
214		} else {
215			defer trace.EndSpan(r.streamd.ctx, nil)
216		}
217	}
218	if r.cancel != nil {
219		r.cancel()
220	}
221	if r.release != nil {
222		r.release(r.err)
223		if r.err == nil {
224			r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
225		}
226		r.release = nil
227	}
228}
229
230// partialResultQueue implements a simple FIFO queue.  The zero value is a valid
231// queue.
232type partialResultQueue struct {
233	q     []*sppb.PartialResultSet
234	first int
235	last  int
236	n     int // number of elements in queue
237}
238
239// empty returns if the partialResultQueue is empty.
240func (q *partialResultQueue) empty() bool {
241	return q.n == 0
242}
243
244// errEmptyQueue returns error for dequeuing an empty queue.
245func errEmptyQueue() error {
246	return spannerErrorf(codes.OutOfRange, "empty partialResultQueue")
247}
248
249// peekLast returns the last item in partialResultQueue; if the queue
250// is empty, it returns error.
251func (q *partialResultQueue) peekLast() (*sppb.PartialResultSet, error) {
252	if q.empty() {
253		return nil, errEmptyQueue()
254	}
255	return q.q[(q.last+cap(q.q)-1)%cap(q.q)], nil
256}
257
258// push adds an item to the tail of partialResultQueue.
259func (q *partialResultQueue) push(r *sppb.PartialResultSet) {
260	if q.q == nil {
261		q.q = make([]*sppb.PartialResultSet, 8 /* arbitrary */)
262	}
263	if q.n == cap(q.q) {
264		buf := make([]*sppb.PartialResultSet, cap(q.q)*2)
265		for i := 0; i < q.n; i++ {
266			buf[i] = q.q[(q.first+i)%cap(q.q)]
267		}
268		q.q = buf
269		q.first = 0
270		q.last = q.n
271	}
272	q.q[q.last] = r
273	q.last = (q.last + 1) % cap(q.q)
274	q.n++
275}
276
277// pop removes an item from the head of partialResultQueue and returns it.
278func (q *partialResultQueue) pop() *sppb.PartialResultSet {
279	if q.n == 0 {
280		return nil
281	}
282	r := q.q[q.first]
283	q.q[q.first] = nil
284	q.first = (q.first + 1) % cap(q.q)
285	q.n--
286	return r
287}
288
289// clear empties partialResultQueue.
290func (q *partialResultQueue) clear() {
291	*q = partialResultQueue{}
292}
293
294// dump retrieves all items from partialResultQueue and return them in a slice.
295// It is used only in tests.
296func (q *partialResultQueue) dump() []*sppb.PartialResultSet {
297	var dq []*sppb.PartialResultSet
298	for i := q.first; len(dq) < q.n; i = (i + 1) % cap(q.q) {
299		dq = append(dq, q.q[i])
300	}
301	return dq
302}
303
304// resumableStreamDecoderState encodes resumableStreamDecoder's status. See also
305// the comments for resumableStreamDecoder.Next.
306type resumableStreamDecoderState int
307
308const (
309	unConnected         resumableStreamDecoderState = iota // 0
310	queueingRetryable                                      // 1
311	queueingUnretryable                                    // 2
312	aborted                                                // 3
313	finished                                               // 4
314)
315
316// resumableStreamDecoder provides a resumable interface for receiving
317// sppb.PartialResultSet(s) from a given query wrapped by
318// resumableStreamDecoder.rpc().
319type resumableStreamDecoder struct {
320	// state is the current status of resumableStreamDecoder, see also
321	// the comments for resumableStreamDecoder.Next.
322	state resumableStreamDecoderState
323
324	// stateWitness when non-nil is called to observe state change,
325	// used for testing.
326	stateWitness func(resumableStreamDecoderState)
327
328	// ctx is the caller's context, used for cancel/timeout Next().
329	ctx context.Context
330
331	// rpc is a factory of streamingReceiver, which might resume
332	// a previous stream from the point encoded in restartToken.
333	// rpc is always a wrapper of a Cloud Spanner query which is
334	// resumable.
335	rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error)
336
337	// replaceSessionFunc is a function that can be used to replace the session
338	// that is being used to execute the read operation. This function should
339	// only be defined for single-use transactions that can safely retry the
340	// read operation on a new session. If this function is nil, the stream
341	// does not support retrying the query on a new session.
342	replaceSessionFunc func(ctx context.Context) error
343
344	// logger is the logger to use.
345	logger *log.Logger
346
347	// stream is the current RPC streaming receiver.
348	stream streamingReceiver
349
350	// q buffers received yet undecoded partial results.
351	q partialResultQueue
352
353	// bytesBetweenResumeTokens is the proxy of the byte size of
354	// PartialResultSets being queued between two resume tokens. Once
355	// bytesBetweenResumeTokens is greater than maxBytesBetweenResumeTokens,
356	// resumableStreamDecoder goes into queueingUnretryable state.
357	bytesBetweenResumeTokens int32
358
359	// maxBytesBetweenResumeTokens is the max number of bytes that can be
360	// buffered between two resume tokens. It is always copied from the global
361	// maxBytesBetweenResumeTokens atomically.
362	maxBytesBetweenResumeTokens int32
363
364	// np is the next sppb.PartialResultSet ready to be returned
365	// to caller of resumableStreamDecoder.Get().
366	np *sppb.PartialResultSet
367
368	// resumeToken stores the resume token that resumableStreamDecoder has
369	// last revealed to caller.
370	resumeToken []byte
371
372	// err is the last error resumableStreamDecoder has encountered so far.
373	err error
374
375	// backoff is used for the retry settings
376	backoff gax.Backoff
377}
378
379// newResumableStreamDecoder creates a new resumeableStreamDecoder instance.
380// Parameter rpc should be a function that creates a new stream beginning at the
381// restartToken if non-nil.
382func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder {
383	return &resumableStreamDecoder{
384		ctx:                         ctx,
385		logger:                      logger,
386		rpc:                         rpc,
387		replaceSessionFunc:          replaceSession,
388		maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens),
389		backoff:                     DefaultRetryBackoff,
390	}
391}
392
393// changeState fulfills state transition for resumableStateDecoder.
394func (d *resumableStreamDecoder) changeState(target resumableStreamDecoderState) {
395	if d.state == queueingRetryable && d.state != target {
396		// Reset bytesBetweenResumeTokens because it is only meaningful/changed
397		// under queueingRetryable state.
398		d.bytesBetweenResumeTokens = 0
399	}
400	d.state = target
401	if d.stateWitness != nil {
402		d.stateWitness(target)
403	}
404}
405
406// isNewResumeToken returns if the observed resume token is different from
407// the one returned from server last time.
408func (d *resumableStreamDecoder) isNewResumeToken(rt []byte) bool {
409	if rt == nil {
410		return false
411	}
412	if bytes.Equal(rt, d.resumeToken) {
413		return false
414	}
415	return true
416}
417
418// Next advances to the next available partial result set.  If error or no
419// more, returns false, call Err to determine if an error was encountered.
420// The following diagram illustrates the state machine of resumableStreamDecoder
421// that Next() implements. Note that state transition can be only triggered by
422// RPC activities.
423/*
424        rpc() fails retryable
425      +---------+
426      |         |    rpc() fails unretryable/ctx timeouts or cancelled
427      |         |   +------------------------------------------------+
428      |         |   |                                                |
429      |         v   |                                                v
430      |     +---+---+---+                       +--------+    +------+--+
431      +-----+unConnected|                       |finished|    | aborted |<----+
432            |           |                       ++-----+-+    +------+--+     |
433            +---+----+--+                        ^     ^             ^        |
434                |    ^                           |     |             |        |
435                |    |                           |     |     recv() fails     |
436                |    |                           |     |             |        |
437                |    |recv() fails retryable     |     |             |        |
438                |    |with valid ctx             |     |             |        |
439                |    |                           |     |             |        |
440      rpc() succeeds |   +-----------------------+     |             |        |
441                |    |   |         recv EOF         recv EOF         |        |
442                |    |   |                             |             |        |
443                v    |   |     Queue size exceeds      |             |        |
444            +---+----+---+----+threshold       +-------+-----------+ |        |
445+---------->+                 +--------------->+                   +-+        |
446|           |queueingRetryable|                |queueingUnretryable|          |
447|           |                 +<---------------+                   |          |
448|           +---+----------+--+ pop() returns  +--+----+-----------+          |
449|               |          |    resume token      |    ^                      |
450|               |          |                      |    |                      |
451|               |          |                      |    |                      |
452+---------------+          |                      |    |                      |
453   recv() succeeds         |                      +----+                      |
454                           |                      recv() succeeds             |
455                           |                                                  |
456                           |                                                  |
457                           |                                                  |
458                           |                                                  |
459                           |                                                  |
460                           +--------------------------------------------------+
461                                               recv() fails unretryable
462
463*/
464var (
465	// maxBytesBetweenResumeTokens is the maximum amount of bytes that
466	// resumableStreamDecoder in queueingRetryable state can use to queue
467	// PartialResultSets before getting into queueingUnretryable state.
468	maxBytesBetweenResumeTokens = int32(128 * 1024 * 1024)
469)
470
471func (d *resumableStreamDecoder) next() bool {
472	retryer := onCodes(d.backoff, codes.Unavailable, codes.Internal)
473	for {
474		switch d.state {
475		case unConnected:
476			// If no gRPC stream is available, try to initiate one.
477			d.stream, d.err = d.rpc(d.ctx, d.resumeToken)
478			if d.err == nil {
479				d.changeState(queueingRetryable)
480				continue
481			}
482			delay, shouldRetry := retryer.Retry(d.err)
483			if !shouldRetry {
484				d.changeState(aborted)
485				continue
486			}
487			trace.TracePrintf(d.ctx, nil, "Backing off stream read for %s", delay)
488			if err := gax.Sleep(d.ctx, delay); err == nil {
489				// Be explicit about state transition, although the
490				// state doesn't actually change. State transition
491				// will be triggered only by RPC activity, regardless of
492				// whether there is an actual state change or not.
493				d.changeState(unConnected)
494			} else {
495				d.err = err
496				d.changeState(aborted)
497			}
498			continue
499
500		case queueingRetryable:
501			fallthrough
502		case queueingUnretryable:
503			// Receiving queue is not empty.
504			last, err := d.q.peekLast()
505			if err != nil {
506				// Only the case that receiving queue is empty could cause
507				// peekLast to return error and in such case, we should try to
508				// receive from stream.
509				d.tryRecv(retryer)
510				continue
511			}
512			if d.isNewResumeToken(last.ResumeToken) {
513				// Got new resume token, return buffered sppb.PartialResultSets
514				// to caller.
515				d.np = d.q.pop()
516				if d.q.empty() {
517					d.bytesBetweenResumeTokens = 0
518					// The new resume token was just popped out from queue,
519					// record it.
520					d.resumeToken = d.np.ResumeToken
521					d.changeState(queueingRetryable)
522				}
523				return true
524			}
525			if d.bytesBetweenResumeTokens >= d.maxBytesBetweenResumeTokens && d.state == queueingRetryable {
526				d.changeState(queueingUnretryable)
527				continue
528			}
529			if d.state == queueingUnretryable {
530				// When there is no resume token observed, only yield
531				// sppb.PartialResultSets to caller under queueingUnretryable
532				// state.
533				d.np = d.q.pop()
534				return true
535			}
536			// Needs to receive more from gRPC stream till a new resume token
537			// is observed.
538			d.tryRecv(retryer)
539			continue
540		case aborted:
541			// Discard all pending items because none of them should be yield
542			// to caller.
543			d.q.clear()
544			return false
545		case finished:
546			// If query has finished, check if there are still buffered messages.
547			if d.q.empty() {
548				// No buffered PartialResultSet.
549				return false
550			}
551			// Although query has finished, there are still buffered
552			// PartialResultSets.
553			d.np = d.q.pop()
554			return true
555
556		default:
557			logf(d.logger, "Unexpected resumableStreamDecoder.state: %v", d.state)
558			return false
559		}
560	}
561}
562
563// tryRecv attempts to receive a PartialResultSet from gRPC stream.
564func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
565	var res *sppb.PartialResultSet
566	res, d.err = d.stream.Recv()
567	if d.err == nil {
568		d.q.push(res)
569		if d.state == queueingRetryable && !d.isNewResumeToken(res.ResumeToken) {
570			d.bytesBetweenResumeTokens += int32(proto.Size(res))
571		}
572		d.changeState(d.state)
573		return
574	}
575	if d.err == io.EOF {
576		d.err = nil
577		d.changeState(finished)
578		return
579	}
580	if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil {
581		// A 'Session not found' error occurred before we received a resume
582		// token and a replaceSessionFunc function is defined. Try to restart
583		// the stream on a new session.
584		if err := d.replaceSessionFunc(d.ctx); err != nil {
585			d.err = err
586			d.changeState(aborted)
587			return
588		}
589	} else {
590		delay, shouldRetry := retryer.Retry(d.err)
591		if !shouldRetry || d.state != queueingRetryable {
592			d.changeState(aborted)
593			return
594		}
595		if err := gax.Sleep(d.ctx, delay); err != nil {
596			d.err = err
597			d.changeState(aborted)
598			return
599		}
600	}
601	// Clear error and retry the stream.
602	d.err = nil
603	// Discard all queue items (none have resume tokens).
604	d.q.clear()
605	d.stream = nil
606	d.changeState(unConnected)
607}
608
609// get returns the most recent PartialResultSet generated by a call to next.
610func (d *resumableStreamDecoder) get() *sppb.PartialResultSet {
611	return d.np
612}
613
614// lastErr returns the last non-EOF error encountered.
615func (d *resumableStreamDecoder) lastErr() error {
616	return d.err
617}
618
619// partialResultSetDecoder assembles PartialResultSet(s) into Cloud Spanner
620// Rows.
621type partialResultSetDecoder struct {
622	row     Row
623	tx      *sppb.Transaction
624	chunked bool // if true, next value should be merged with last values
625	// entry.
626	ts time.Time // read timestamp
627}
628
629// yield checks we have a complete row, and if so returns it.  A row is not
630// complete if it doesn't have enough columns, or if this is a chunked response
631// and there are no further values to process.
632func (p *partialResultSetDecoder) yield(chunked, last bool) *Row {
633	if len(p.row.vals) == len(p.row.fields) && (!chunked || !last) {
634		// When partialResultSetDecoder gets enough number of Column values.
635		// There are two cases that a new Row should be yield:
636		//
637		//   1. The incoming PartialResultSet is not chunked;
638		//   2. The incoming PartialResultSet is chunked, but the
639		//      proto3.Value being merged is not the last one in
640		//      the PartialResultSet.
641		//
642		// Use a fresh Row to simplify clients that want to use yielded results
643		// after the next row is retrieved. Note that fields is never changed
644		// so it doesn't need to be copied.
645		fresh := Row{
646			fields: p.row.fields,
647			vals:   make([]*proto3.Value, len(p.row.vals)),
648		}
649		copy(fresh.vals, p.row.vals)
650		p.row.vals = p.row.vals[:0] // empty and reuse slice
651		return &fresh
652	}
653	return nil
654}
655
656// yieldTx returns transaction information via caller supplied callback.
657func errChunkedEmptyRow() error {
658	return spannerErrorf(codes.FailedPrecondition, "got invalid chunked PartialResultSet with empty Row")
659}
660
661// add tries to merge a new PartialResultSet into buffered Row. It returns any
662// rows that have been completed as a result.
663func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, *sppb.ResultSetMetadata, error) {
664	var rows []*Row
665	if r.Metadata != nil {
666		// Metadata should only be returned in the first result.
667		if p.row.fields == nil {
668			p.row.fields = r.Metadata.RowType.Fields
669		}
670		if p.tx == nil && r.Metadata.Transaction != nil {
671			p.tx = r.Metadata.Transaction
672			if p.tx.ReadTimestamp != nil {
673				p.ts = time.Unix(p.tx.ReadTimestamp.Seconds, int64(p.tx.ReadTimestamp.Nanos))
674			}
675		}
676	}
677	if len(r.Values) == 0 {
678		return nil, r.Metadata, nil
679	}
680	if p.chunked {
681		p.chunked = false
682		// Try to merge first value in r.Values into uncompleted row.
683		last := len(p.row.vals) - 1
684		if last < 0 { // confidence check
685			return nil, nil, errChunkedEmptyRow()
686		}
687		var err error
688		// If p is chunked, then we should always try to merge p.last with
689		// r.first.
690		if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
691			return nil, r.Metadata, err
692		}
693		r.Values = r.Values[1:]
694		// Merge is done, try to yield a complete Row.
695		if row := p.yield(r.ChunkedValue, len(r.Values) == 0); row != nil {
696			rows = append(rows, row)
697		}
698	}
699	for i, v := range r.Values {
700		// The rest values in r can be appened into p directly.
701		p.row.vals = append(p.row.vals, v)
702		// Again, check to see if a complete Row can be yielded because of the
703		// newly added value.
704		if row := p.yield(r.ChunkedValue, i == len(r.Values)-1); row != nil {
705			rows = append(rows, row)
706		}
707	}
708	if r.ChunkedValue {
709		// After dealing with all values in r, if r is chunked then p must be
710		// also chunked.
711		p.chunked = true
712	}
713	return rows, r.Metadata, nil
714}
715
716// isMergeable returns if a protobuf Value can be potentially merged with other
717// protobuf Values.
718func (p *partialResultSetDecoder) isMergeable(a *proto3.Value) bool {
719	switch a.Kind.(type) {
720	case *proto3.Value_StringValue:
721		return true
722	case *proto3.Value_ListValue:
723		return true
724	default:
725		return false
726	}
727}
728
729// errIncompatibleMergeTypes returns error for incompatible protobuf types that
730// cannot be merged by partialResultSetDecoder.
731func errIncompatibleMergeTypes(a, b *proto3.Value) error {
732	return spannerErrorf(codes.FailedPrecondition, "incompatible type in chunked PartialResultSet. expected (%T), got (%T)", a.Kind, b.Kind)
733}
734
735// errUnsupportedMergeType returns error for protobuf type that cannot be merged
736// to other protobufs.
737func errUnsupportedMergeType(a *proto3.Value) error {
738	return spannerErrorf(codes.FailedPrecondition, "unsupported type merge (%T)", a.Kind)
739}
740
741// merge tries to combine two protobuf Values if possible.
742func (p *partialResultSetDecoder) merge(a, b *proto3.Value) (*proto3.Value, error) {
743	var err error
744	typeErr := errIncompatibleMergeTypes(a, b)
745	switch t := a.Kind.(type) {
746	case *proto3.Value_StringValue:
747		s, ok := b.Kind.(*proto3.Value_StringValue)
748		if !ok {
749			return nil, typeErr
750		}
751		return &proto3.Value{
752			Kind: &proto3.Value_StringValue{StringValue: t.StringValue + s.StringValue},
753		}, nil
754	case *proto3.Value_ListValue:
755		l, ok := b.Kind.(*proto3.Value_ListValue)
756		if !ok {
757			return nil, typeErr
758		}
759		if l.ListValue == nil || len(l.ListValue.Values) <= 0 {
760			// b is an empty list, just return a.
761			return a, nil
762		}
763		if t.ListValue == nil || len(t.ListValue.Values) <= 0 {
764			// a is an empty list, just return b.
765			return b, nil
766		}
767		if la := len(t.ListValue.Values) - 1; p.isMergeable(t.ListValue.Values[la]) {
768			// When the last item in a is of type String, List or Struct
769			// (encoded into List by Cloud Spanner), try to Merge last item in
770			// a and first item in b.
771			t.ListValue.Values[la], err = p.merge(t.ListValue.Values[la], l.ListValue.Values[0])
772			if err != nil {
773				return nil, err
774			}
775			l.ListValue.Values = l.ListValue.Values[1:]
776		}
777		return &proto3.Value{
778			Kind: &proto3.Value_ListValue{
779				ListValue: &proto3.ListValue{
780					Values: append(t.ListValue.Values, l.ListValue.Values...),
781				},
782			},
783		}, nil
784	default:
785		return nil, errUnsupportedMergeType(a)
786	}
787
788}
789
790// Done returns if partialResultSetDecoder has already done with all buffered
791// values.
792func (p *partialResultSetDecoder) done() bool {
793	// There is no explicit end of stream marker, but ending part way through a
794	// row is obviously bad, or ending with the last column still awaiting
795	// completion.
796	return len(p.row.vals) == 0 && !p.chunked
797}
798