1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package managedwriter
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"sync"
22
23	"github.com/googleapis/gax-go/v2"
24	"go.opencensus.io/tag"
25	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
26	"google.golang.org/grpc/codes"
27	grpcstatus "google.golang.org/grpc/status"
28	"google.golang.org/protobuf/types/descriptorpb"
29	"google.golang.org/protobuf/types/known/wrapperspb"
30)
31
32// StreamType indicates the type of stream this write client is managing.
33type StreamType string
34
35var (
36	// DefaultStream most closely mimics the legacy bigquery
37	// tabledata.insertAll semantics.  Successful inserts are
38	// committed immediately, and there's no tracking offsets as
39	// all writes go into a "default" stream that always exists
40	// for a table.
41	DefaultStream StreamType = "DEFAULT"
42
43	// CommittedStream appends data immediately, but creates a
44	// discrete stream for the work so that offset tracking can
45	// be used to track writes.
46	CommittedStream StreamType = "COMMITTED"
47
48	// BufferedStream is a form of checkpointed stream, that allows
49	// you to advance the offset of visible rows via Flush operations.
50	//
51	// NOTE: Buffered Streams are currently in limited preview, and as such
52	// methods like FlushRows() may yield errors for non-enrolled projects.
53	BufferedStream StreamType = "BUFFERED"
54
55	// PendingStream is a stream in which no data is made visible to
56	// readers until the stream is finalized and committed explicitly.
57	PendingStream StreamType = "PENDING"
58)
59
60func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
61	switch t {
62	case CommittedStream:
63		return storagepb.WriteStream_COMMITTED
64	case PendingStream:
65		return storagepb.WriteStream_PENDING
66	case BufferedStream:
67		return storagepb.WriteStream_BUFFERED
68	default:
69		return storagepb.WriteStream_TYPE_UNSPECIFIED
70	}
71}
72
73// ManagedStream is the abstraction over a single write stream.
74type ManagedStream struct {
75	streamSettings   *streamSettings
76	schemaDescriptor *descriptorpb.DescriptorProto
77	destinationTable string
78	c                *Client
79	fc               *flowController
80
81	// aspects of the stream client
82	ctx    context.Context // retained context for the stream
83	cancel context.CancelFunc
84	open   func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
85
86	mu          sync.Mutex
87	arc         *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
88	err         error                                     // terminal error
89	pending     chan *pendingWrite                        // writes awaiting status
90	streamSetup *sync.Once                                // handles amending the first request in a new stream
91}
92
93// enables testing
94type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
95
96// streamSettings govern behavior of the append stream RPCs.
97type streamSettings struct {
98
99	// streamID contains the reference to the destination stream.
100	streamID string
101
102	// streamType governs behavior of the client, such as how
103	// offset handling is managed.
104	streamType StreamType
105
106	// MaxInflightRequests governs how many unacknowledged
107	// append writes can be outstanding into the system.
108	MaxInflightRequests int
109
110	// MaxInflightBytes governs how many unacknowledged
111	// request bytes can be outstanding into the system.
112	MaxInflightBytes int
113
114	// TraceID can be set when appending data on a stream. It's
115	// purpose is to aid in debug and diagnostic scenarios.
116	TraceID string
117
118	// dataOrigin can be set for classifying metrics generated
119	// by a stream.
120	dataOrigin string
121}
122
123func defaultStreamSettings() *streamSettings {
124	return &streamSettings{
125		streamType:          DefaultStream,
126		MaxInflightRequests: 1000,
127		MaxInflightBytes:    0,
128		TraceID:             "",
129	}
130}
131
132// StreamName returns the corresponding write stream ID being managed by this writer.
133func (ms *ManagedStream) StreamName() string {
134	return ms.streamSettings.streamID
135}
136
137// StreamType returns the configured type for this stream.
138func (ms *ManagedStream) StreamType() StreamType {
139	return ms.streamSettings.streamType
140}
141
142// FlushRows advances the offset at which rows in a BufferedStream are visible.  Calling
143// this method for other stream types yields an error.
144func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
145	req := &storagepb.FlushRowsRequest{
146		WriteStream: ms.streamSettings.streamID,
147		Offset: &wrapperspb.Int64Value{
148			Value: offset,
149		},
150	}
151	resp, err := ms.c.rawClient.FlushRows(ctx, req)
152	recordStat(ms.ctx, FlushRequests, 1)
153	if err != nil {
154		return 0, err
155	}
156	return resp.GetOffset(), nil
157}
158
159// Finalize is used to mark a stream as complete, and thus ensure no further data can
160// be appended to the stream.  You cannot finalize a DefaultStream, as it always exists.
161//
162// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
163// data in a PendingStream.
164func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
165	// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
166	req := &storagepb.FinalizeWriteStreamRequest{
167		Name: ms.streamSettings.streamID,
168	}
169	resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
170	if err != nil {
171		return 0, err
172	}
173	return resp.GetRowCount(), nil
174}
175
176// getStream returns either a valid ARC client stream or permanent error.
177//
178// Calling getStream locks the mutex.
179func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
180	ms.mu.Lock()
181	defer ms.mu.Unlock()
182	if ms.err != nil {
183		return nil, nil, ms.err
184	}
185	ms.err = ms.ctx.Err()
186	if ms.err != nil {
187		return nil, nil, ms.err
188	}
189
190	// Always return the retained ARC if the arg differs.
191	if arc != ms.arc {
192		return ms.arc, ms.pending, nil
193	}
194
195	ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient)
196	*ms.arc, ms.pending, ms.err = ms.openWithRetry()
197	return ms.arc, ms.pending, ms.err
198}
199
200// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection.
201//
202// Only getStream() should call this, and thus the calling code has the mutex lock.
203func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
204	r := defaultRetryer{}
205	for {
206		recordStat(ms.ctx, AppendClientOpenCount, 1)
207		streamID := ""
208		if ms.streamSettings != nil {
209			streamID = ms.streamSettings.streamID
210		}
211		arc, err := ms.open(streamID)
212		bo, shouldRetry := r.Retry(err)
213		if err != nil && shouldRetry {
214			recordStat(ms.ctx, AppendClientOpenRetryCount, 1)
215			if err := gax.Sleep(ms.ctx, bo); err != nil {
216				return nil, nil, err
217			}
218			continue
219		}
220		if err == nil {
221			// The channel relationship with its ARC is 1:1.  If we get a new ARC, create a new chan
222			// and fire up the associated receive processor.
223			ch := make(chan *pendingWrite)
224			go recvProcessor(ms.ctx, arc, ms.fc, ch)
225			// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
226			// for every new connection.
227			ms.streamSetup = new(sync.Once)
228			return arc, ch, nil
229		}
230		return arc, nil, err
231	}
232}
233
234func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error {
235	var settings gax.CallSettings
236	for _, opt := range opts {
237		opt.Resolve(&settings)
238	}
239	var r gax.Retryer = &defaultRetryer{}
240	if settings.Retry != nil {
241		r = settings.Retry()
242	}
243
244	var arc *storagepb.BigQueryWrite_AppendRowsClient
245	var ch chan *pendingWrite
246	var err error
247
248	for {
249		arc, ch, err = ms.getStream(arc)
250		if err != nil {
251			return err
252		}
253		var req *storagepb.AppendRowsRequest
254		ms.streamSetup.Do(func() {
255			reqCopy := *pw.request
256			reqCopy.WriteStream = ms.streamSettings.streamID
257			reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
258				ProtoDescriptor: ms.schemaDescriptor,
259			}
260			if ms.streamSettings.TraceID != "" {
261				reqCopy.TraceId = ms.streamSettings.TraceID
262			}
263			req = &reqCopy
264		})
265
266		var err error
267		if req == nil {
268			err = (*arc).Send(pw.request)
269		} else {
270			// we had to amend the initial request
271			err = (*arc).Send(req)
272		}
273		recordStat(ms.ctx, AppendRequests, 1)
274		recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
275		recordStat(ms.ctx, AppendRequestRows, int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())))
276		if err != nil {
277			status := grpcstatus.Convert(err)
278			if status != nil {
279				ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
280				recordStat(ctx, AppendRequestErrors, 1)
281			}
282			bo, shouldRetry := r.Retry(err)
283			if shouldRetry {
284				if err := gax.Sleep(ms.ctx, bo); err != nil {
285					return err
286				}
287				continue
288			}
289			ms.mu.Lock()
290			ms.err = err
291			ms.mu.Unlock()
292		}
293		if err == nil {
294			ch <- pw
295		}
296		return err
297	}
298}
299
300// Close closes a managed stream.
301func (ms *ManagedStream) Close() error {
302
303	var arc *storagepb.BigQueryWrite_AppendRowsClient
304
305	arc, ch, err := ms.getStream(arc)
306	if err != nil {
307		return err
308	}
309	if ms.arc == nil {
310		return fmt.Errorf("no stream exists")
311	}
312	err = (*arc).CloseSend()
313	if err == nil {
314		close(ch)
315	}
316	ms.mu.Lock()
317	ms.err = io.EOF
318	ms.mu.Unlock()
319	// Propagate cancellation.
320	if ms.cancel != nil {
321		ms.cancel()
322	}
323	return err
324}
325
326// AppendRows sends the append requests to the service, and returns one AppendResult per row.
327// The format of the row data is binary serialized protocol buffer bytes, and and the message
328// must adhere to the format of the schema Descriptor passed in when creating the managed stream.
329func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
330	pw := newPendingWrite(data, offset)
331	// check flow control
332	if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
333		// in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release.
334		pw.markDone(NoStreamOffset, err, nil)
335	}
336	// proceed to call
337	if err := ms.append(pw); err != nil {
338		// pending write is DOA.
339		pw.markDone(NoStreamOffset, err, ms.fc)
340		return nil, err
341	}
342	return pw.results, nil
343}
344
345// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.
346//
347// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
348// with the mutex lock.
349func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) {
350	// TODO:  We'd like to re-send requests that are in an ambiguous state due to channel errors.  For now, we simply
351	// ensure that pending writes get acknowledged with a terminal state.
352	for {
353		select {
354		case <-ctx.Done():
355			// Context is done, so we're not going to get further updates.  Mark all work failed with the context error.
356			for {
357				pw, ok := <-ch
358				if !ok {
359					return
360				}
361				pw.markDone(NoStreamOffset, ctx.Err(), fc)
362			}
363		case nextWrite, ok := <-ch:
364			if !ok {
365				// Channel closed, all elements processed.
366				return
367			}
368
369			// block until we get a corresponding response or err from stream.
370			resp, err := arc.Recv()
371			if err != nil {
372				nextWrite.markDone(NoStreamOffset, err, fc)
373				continue
374			}
375			recordStat(ctx, AppendResponses, 1)
376
377			if status := resp.GetError(); status != nil {
378				tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
379				if err != nil {
380					tagCtx = ctx
381				}
382				recordStat(tagCtx, AppendResponseErrors, 1)
383				nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
384				continue
385			}
386			success := resp.GetAppendResult()
387			off := success.GetOffset()
388			if off != nil {
389				nextWrite.markDone(off.GetValue(), nil, fc)
390			} else {
391				nextWrite.markDone(NoStreamOffset, nil, fc)
392			}
393		}
394	}
395}
396