1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package managedwriter
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"sync"
22
23	"github.com/googleapis/gax-go/v2"
24	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
25	grpcstatus "google.golang.org/grpc/status"
26	"google.golang.org/protobuf/types/descriptorpb"
27	"google.golang.org/protobuf/types/known/wrapperspb"
28)
29
30// StreamType indicates the type of stream this write client is managing.
31type StreamType string
32
33var (
34	// DefaultStream most closely mimics the legacy bigquery
35	// tabledata.insertAll semantics.  Successful inserts are
36	// committed immediately, and there's no tracking offsets as
37	// all writes go into a "default" stream that always exists
38	// for a table.
39	DefaultStream StreamType = "DEFAULT"
40
41	// CommittedStream appends data immediately, but creates a
42	// discrete stream for the work so that offset tracking can
43	// be used to track writes.
44	CommittedStream StreamType = "COMMITTED"
45
46	// BufferedStream is a form of checkpointed stream, that allows
47	// you to advance the offset of visible rows via Flush operations.
48	//
49	// NOTE: Buffered Streams are currently in limited preview, and as such
50	// methods like FlushRows() may yield errors for non-enrolled projects.
51	BufferedStream StreamType = "BUFFERED"
52
53	// PendingStream is a stream in which no data is made visible to
54	// readers until the stream is finalized and committed explicitly.
55	PendingStream StreamType = "PENDING"
56)
57
58func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
59	switch t {
60	case CommittedStream:
61		return storagepb.WriteStream_COMMITTED
62	case PendingStream:
63		return storagepb.WriteStream_PENDING
64	case BufferedStream:
65		return storagepb.WriteStream_BUFFERED
66	default:
67		return storagepb.WriteStream_TYPE_UNSPECIFIED
68	}
69}
70
71// ManagedStream is the abstraction over a single write stream.
72type ManagedStream struct {
73	streamSettings   *streamSettings
74	schemaDescriptor *descriptorpb.DescriptorProto
75	destinationTable string
76	c                *Client
77	fc               *flowController
78
79	// aspects of the stream client
80	ctx    context.Context // retained context for the stream
81	cancel context.CancelFunc
82	open   func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
83
84	mu          sync.Mutex
85	arc         *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
86	err         error                                     // terminal error
87	pending     chan *pendingWrite                        // writes awaiting status
88	streamSetup *sync.Once                                // handles amending the first request in a new stream
89}
90
91// enables testing
92type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
93
94// streamSettings govern behavior of the append stream RPCs.
95type streamSettings struct {
96
97	// streamID contains the reference to the destination stream.
98	streamID string
99
100	// streamType governs behavior of the client, such as how
101	// offset handling is managed.
102	streamType StreamType
103
104	// MaxInflightRequests governs how many unacknowledged
105	// append writes can be outstanding into the system.
106	MaxInflightRequests int
107
108	// MaxInflightBytes governs how many unacknowledged
109	// request bytes can be outstanding into the system.
110	MaxInflightBytes int
111
112	// TraceID can be set when appending data on a stream. It's
113	// purpose is to aid in debug and diagnostic scenarios.
114	TraceID string
115
116	// dataOrigin can be set for classifying metrics generated
117	// by a stream.
118	dataOrigin string
119}
120
121func defaultStreamSettings() *streamSettings {
122	return &streamSettings{
123		streamType:          DefaultStream,
124		MaxInflightRequests: 1000,
125		MaxInflightBytes:    0,
126		TraceID:             "",
127	}
128}
129
130// StreamName returns the corresponding write stream ID being managed by this writer.
131func (ms *ManagedStream) StreamName() string {
132	return ms.streamSettings.streamID
133}
134
135// StreamType returns the configured type for this stream.
136func (ms *ManagedStream) StreamType() StreamType {
137	return ms.streamSettings.streamType
138}
139
140// FlushRows advances the offset at which rows in a BufferedStream are visible.  Calling
141// this method for other stream types yields an error.
142func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
143	req := &storagepb.FlushRowsRequest{
144		WriteStream: ms.streamSettings.streamID,
145		Offset: &wrapperspb.Int64Value{
146			Value: offset,
147		},
148	}
149	resp, err := ms.c.rawClient.FlushRows(ctx, req)
150	recordStat(ms.ctx, FlushRequests, 1)
151	if err != nil {
152		return 0, err
153	}
154	return resp.GetOffset(), nil
155}
156
157// Finalize is used to mark a stream as complete, and thus ensure no further data can
158// be appended to the stream.  You cannot finalize a DefaultStream, as it always exists.
159//
160// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
161// data in a PendingStream.
162func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
163	// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
164	req := &storagepb.FinalizeWriteStreamRequest{
165		Name: ms.streamSettings.streamID,
166	}
167	resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
168	if err != nil {
169		return 0, err
170	}
171	return resp.GetRowCount(), nil
172}
173
174// getStream returns either a valid ARC client stream or permanent error.
175//
176// Calling getStream locks the mutex.
177func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
178	ms.mu.Lock()
179	defer ms.mu.Unlock()
180	if ms.err != nil {
181		return nil, nil, ms.err
182	}
183	ms.err = ms.ctx.Err()
184	if ms.err != nil {
185		return nil, nil, ms.err
186	}
187
188	// Always return the retained ARC if the arg differs.
189	if arc != ms.arc {
190		return ms.arc, ms.pending, nil
191	}
192
193	ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient)
194	*ms.arc, ms.pending, ms.err = ms.openWithRetry()
195	return ms.arc, ms.pending, ms.err
196}
197
198// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection.
199//
200// Only getStream() should call this, and thus the calling code has the mutex lock.
201func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
202	r := defaultRetryer{}
203	for {
204		recordStat(ms.ctx, AppendClientOpenCount, 1)
205		streamID := ""
206		if ms.streamSettings != nil {
207			streamID = ms.streamSettings.streamID
208		}
209		arc, err := ms.open(streamID)
210		bo, shouldRetry := r.Retry(err)
211		if err != nil && shouldRetry {
212			recordStat(ms.ctx, AppendClientOpenRetryCount, 1)
213			if err := gax.Sleep(ms.ctx, bo); err != nil {
214				return nil, nil, err
215			}
216			continue
217		}
218		if err == nil {
219			// The channel relationship with its ARC is 1:1.  If we get a new ARC, create a new chan
220			// and fire up the associated receive processor.
221			ch := make(chan *pendingWrite)
222			go recvProcessor(ms.ctx, arc, ms.fc, ch)
223			// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
224			// for every new connection.
225			ms.streamSetup = new(sync.Once)
226			return arc, ch, nil
227		}
228		return arc, nil, err
229	}
230}
231
232func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error {
233	var settings gax.CallSettings
234	for _, opt := range opts {
235		opt.Resolve(&settings)
236	}
237	var r gax.Retryer = &defaultRetryer{}
238	if settings.Retry != nil {
239		r = settings.Retry()
240	}
241
242	var arc *storagepb.BigQueryWrite_AppendRowsClient
243	var ch chan *pendingWrite
244	var err error
245
246	for {
247		arc, ch, err = ms.getStream(arc)
248		if err != nil {
249			return err
250		}
251		var req *storagepb.AppendRowsRequest
252		ms.streamSetup.Do(func() {
253			reqCopy := *pw.request
254			reqCopy.WriteStream = ms.streamSettings.streamID
255			reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
256				ProtoDescriptor: ms.schemaDescriptor,
257			}
258			if ms.streamSettings.TraceID != "" {
259				reqCopy.TraceId = ms.streamSettings.TraceID
260			}
261			req = &reqCopy
262		})
263
264		var err error
265		if req == nil {
266			err = (*arc).Send(pw.request)
267		} else {
268			// we had to amend the initial request
269			err = (*arc).Send(req)
270		}
271		recordStat(ms.ctx, AppendRequests, 1)
272		if err != nil {
273			bo, shouldRetry := r.Retry(err)
274			if shouldRetry {
275				if err := gax.Sleep(ms.ctx, bo); err != nil {
276					return err
277				}
278				continue
279			}
280			ms.mu.Lock()
281			ms.err = err
282			ms.mu.Unlock()
283		}
284		if err == nil {
285			ch <- pw
286		}
287		return err
288	}
289}
290
291// Close closes a managed stream.
292func (ms *ManagedStream) Close() error {
293
294	var arc *storagepb.BigQueryWrite_AppendRowsClient
295
296	arc, ch, err := ms.getStream(arc)
297	if err != nil {
298		return err
299	}
300	if ms.arc == nil {
301		return fmt.Errorf("no stream exists")
302	}
303	err = (*arc).CloseSend()
304	if err == nil {
305		close(ch)
306	}
307	ms.mu.Lock()
308	ms.err = io.EOF
309	ms.mu.Unlock()
310	// Propagate cancellation.
311	if ms.cancel != nil {
312		ms.cancel()
313	}
314	return err
315}
316
317// AppendRows sends the append requests to the service, and returns one AppendResult per row.
318// The format of the row data is binary serialized protocol buffer bytes, and and the message
319// must adhere to the format of the schema Descriptor passed in when creating the managed stream.
320func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
321	pw := newPendingWrite(data, offset)
322	// check flow control
323	if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
324		// in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release.
325		pw.markDone(NoStreamOffset, err, nil)
326	}
327	// proceed to call
328	if err := ms.append(pw); err != nil {
329		// pending write is DOA.
330		pw.markDone(NoStreamOffset, err, ms.fc)
331		return nil, err
332	}
333	return pw.results, nil
334}
335
336// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.
337//
338// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
339// with the mutex lock.
340func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) {
341	// TODO:  We'd like to re-send requests that are in an ambiguous state due to channel errors.  For now, we simply
342	// ensure that pending writes get acknowledged with a terminal state.
343	for {
344		select {
345		case <-ctx.Done():
346			// Context is done, so we're not going to get further updates.  Mark all work failed with the context error.
347			for {
348				pw, ok := <-ch
349				if !ok {
350					return
351				}
352				pw.markDone(NoStreamOffset, ctx.Err(), fc)
353			}
354		case nextWrite, ok := <-ch:
355			if !ok {
356				// Channel closed, all elements processed.
357				return
358			}
359
360			// block until we get a corresponding response or err from stream.
361			resp, err := arc.Recv()
362			if err != nil {
363				nextWrite.markDone(NoStreamOffset, err, fc)
364				continue
365			}
366			recordStat(ctx, AppendResponses, 1)
367
368			if status := resp.GetError(); status != nil {
369				nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
370				continue
371			}
372			success := resp.GetAppendResult()
373			off := success.GetOffset()
374			if off != nil {
375				nextWrite.markDone(off.GetValue(), nil, fc)
376			} else {
377				nextWrite.markDone(NoStreamOffset, nil, fc)
378			}
379		}
380	}
381}
382