1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package managedwriter 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "sync" 22 23 "github.com/googleapis/gax-go/v2" 24 storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" 25 grpcstatus "google.golang.org/grpc/status" 26 "google.golang.org/protobuf/types/descriptorpb" 27 "google.golang.org/protobuf/types/known/wrapperspb" 28) 29 30// StreamType indicates the type of stream this write client is managing. 31type StreamType string 32 33var ( 34 // DefaultStream most closely mimics the legacy bigquery 35 // tabledata.insertAll semantics. Successful inserts are 36 // committed immediately, and there's no tracking offsets as 37 // all writes go into a "default" stream that always exists 38 // for a table. 39 DefaultStream StreamType = "DEFAULT" 40 41 // CommittedStream appends data immediately, but creates a 42 // discrete stream for the work so that offset tracking can 43 // be used to track writes. 44 CommittedStream StreamType = "COMMITTED" 45 46 // BufferedStream is a form of checkpointed stream, that allows 47 // you to advance the offset of visible rows via Flush operations. 48 // 49 // NOTE: Buffered Streams are currently in limited preview, and as such 50 // methods like FlushRows() may yield errors for non-enrolled projects. 51 BufferedStream StreamType = "BUFFERED" 52 53 // PendingStream is a stream in which no data is made visible to 54 // readers until the stream is finalized and committed explicitly. 55 PendingStream StreamType = "PENDING" 56) 57 58func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { 59 switch t { 60 case CommittedStream: 61 return storagepb.WriteStream_COMMITTED 62 case PendingStream: 63 return storagepb.WriteStream_PENDING 64 case BufferedStream: 65 return storagepb.WriteStream_BUFFERED 66 default: 67 return storagepb.WriteStream_TYPE_UNSPECIFIED 68 } 69} 70 71// ManagedStream is the abstraction over a single write stream. 72type ManagedStream struct { 73 streamSettings *streamSettings 74 schemaDescriptor *descriptorpb.DescriptorProto 75 destinationTable string 76 c *Client 77 fc *flowController 78 79 // aspects of the stream client 80 ctx context.Context // retained context for the stream 81 cancel context.CancelFunc 82 open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection 83 84 mu sync.Mutex 85 arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection 86 err error // terminal error 87 pending chan *pendingWrite // writes awaiting status 88 streamSetup *sync.Once // handles amending the first request in a new stream 89} 90 91// enables testing 92type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) 93 94// streamSettings govern behavior of the append stream RPCs. 95type streamSettings struct { 96 97 // streamID contains the reference to the destination stream. 98 streamID string 99 100 // streamType governs behavior of the client, such as how 101 // offset handling is managed. 102 streamType StreamType 103 104 // MaxInflightRequests governs how many unacknowledged 105 // append writes can be outstanding into the system. 106 MaxInflightRequests int 107 108 // MaxInflightBytes governs how many unacknowledged 109 // request bytes can be outstanding into the system. 110 MaxInflightBytes int 111 112 // TraceID can be set when appending data on a stream. It's 113 // purpose is to aid in debug and diagnostic scenarios. 114 TraceID string 115 116 // dataOrigin can be set for classifying metrics generated 117 // by a stream. 118 dataOrigin string 119} 120 121func defaultStreamSettings() *streamSettings { 122 return &streamSettings{ 123 streamType: DefaultStream, 124 MaxInflightRequests: 1000, 125 MaxInflightBytes: 0, 126 TraceID: "", 127 } 128} 129 130// StreamName returns the corresponding write stream ID being managed by this writer. 131func (ms *ManagedStream) StreamName() string { 132 return ms.streamSettings.streamID 133} 134 135// StreamType returns the configured type for this stream. 136func (ms *ManagedStream) StreamType() StreamType { 137 return ms.streamSettings.streamType 138} 139 140// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling 141// this method for other stream types yields an error. 142func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) { 143 req := &storagepb.FlushRowsRequest{ 144 WriteStream: ms.streamSettings.streamID, 145 Offset: &wrapperspb.Int64Value{ 146 Value: offset, 147 }, 148 } 149 resp, err := ms.c.rawClient.FlushRows(ctx, req) 150 recordStat(ms.ctx, FlushRequests, 1) 151 if err != nil { 152 return 0, err 153 } 154 return resp.GetOffset(), nil 155} 156 157// Finalize is used to mark a stream as complete, and thus ensure no further data can 158// be appended to the stream. You cannot finalize a DefaultStream, as it always exists. 159// 160// Finalizing does not advance the current offset of a BufferedStream, nor does it commit 161// data in a PendingStream. 162func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { 163 // TODO: consider blocking for in-flight appends once we have an appendStream plumbed in. 164 req := &storagepb.FinalizeWriteStreamRequest{ 165 Name: ms.streamSettings.streamID, 166 } 167 resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req) 168 if err != nil { 169 return 0, err 170 } 171 return resp.GetRowCount(), nil 172} 173 174// getStream returns either a valid ARC client stream or permanent error. 175// 176// Calling getStream locks the mutex. 177func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { 178 ms.mu.Lock() 179 defer ms.mu.Unlock() 180 if ms.err != nil { 181 return nil, nil, ms.err 182 } 183 ms.err = ms.ctx.Err() 184 if ms.err != nil { 185 return nil, nil, ms.err 186 } 187 188 // Always return the retained ARC if the arg differs. 189 if arc != ms.arc { 190 return ms.arc, ms.pending, nil 191 } 192 193 ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) 194 *ms.arc, ms.pending, ms.err = ms.openWithRetry() 195 return ms.arc, ms.pending, ms.err 196} 197 198// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. 199// 200// Only getStream() should call this, and thus the calling code has the mutex lock. 201func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { 202 r := defaultRetryer{} 203 for { 204 recordStat(ms.ctx, AppendClientOpenCount, 1) 205 streamID := "" 206 if ms.streamSettings != nil { 207 streamID = ms.streamSettings.streamID 208 } 209 arc, err := ms.open(streamID) 210 bo, shouldRetry := r.Retry(err) 211 if err != nil && shouldRetry { 212 recordStat(ms.ctx, AppendClientOpenRetryCount, 1) 213 if err := gax.Sleep(ms.ctx, bo); err != nil { 214 return nil, nil, err 215 } 216 continue 217 } 218 if err == nil { 219 // The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan 220 // and fire up the associated receive processor. 221 ch := make(chan *pendingWrite) 222 go recvProcessor(ms.ctx, arc, ms.fc, ch) 223 // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work 224 // for every new connection. 225 ms.streamSetup = new(sync.Once) 226 return arc, ch, nil 227 } 228 return arc, nil, err 229 } 230} 231 232func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error { 233 var settings gax.CallSettings 234 for _, opt := range opts { 235 opt.Resolve(&settings) 236 } 237 var r gax.Retryer = &defaultRetryer{} 238 if settings.Retry != nil { 239 r = settings.Retry() 240 } 241 242 var arc *storagepb.BigQueryWrite_AppendRowsClient 243 var ch chan *pendingWrite 244 var err error 245 246 for { 247 arc, ch, err = ms.getStream(arc) 248 if err != nil { 249 return err 250 } 251 var req *storagepb.AppendRowsRequest 252 ms.streamSetup.Do(func() { 253 reqCopy := *pw.request 254 reqCopy.WriteStream = ms.streamSettings.streamID 255 reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ 256 ProtoDescriptor: ms.schemaDescriptor, 257 } 258 if ms.streamSettings.TraceID != "" { 259 reqCopy.TraceId = ms.streamSettings.TraceID 260 } 261 req = &reqCopy 262 }) 263 264 var err error 265 if req == nil { 266 err = (*arc).Send(pw.request) 267 } else { 268 // we had to amend the initial request 269 err = (*arc).Send(req) 270 } 271 recordStat(ms.ctx, AppendRequests, 1) 272 if err != nil { 273 bo, shouldRetry := r.Retry(err) 274 if shouldRetry { 275 if err := gax.Sleep(ms.ctx, bo); err != nil { 276 return err 277 } 278 continue 279 } 280 ms.mu.Lock() 281 ms.err = err 282 ms.mu.Unlock() 283 } 284 if err == nil { 285 ch <- pw 286 } 287 return err 288 } 289} 290 291// Close closes a managed stream. 292func (ms *ManagedStream) Close() error { 293 294 var arc *storagepb.BigQueryWrite_AppendRowsClient 295 296 arc, ch, err := ms.getStream(arc) 297 if err != nil { 298 return err 299 } 300 if ms.arc == nil { 301 return fmt.Errorf("no stream exists") 302 } 303 err = (*arc).CloseSend() 304 if err == nil { 305 close(ch) 306 } 307 ms.mu.Lock() 308 ms.err = io.EOF 309 ms.mu.Unlock() 310 // Propagate cancellation. 311 if ms.cancel != nil { 312 ms.cancel() 313 } 314 return err 315} 316 317// AppendRows sends the append requests to the service, and returns one AppendResult per row. 318// The format of the row data is binary serialized protocol buffer bytes, and and the message 319// must adhere to the format of the schema Descriptor passed in when creating the managed stream. 320func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) { 321 pw := newPendingWrite(data, offset) 322 // check flow control 323 if err := ms.fc.acquire(ctx, pw.reqSize); err != nil { 324 // in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release. 325 pw.markDone(NoStreamOffset, err, nil) 326 } 327 // proceed to call 328 if err := ms.append(pw); err != nil { 329 // pending write is DOA. 330 pw.markDone(NoStreamOffset, err, ms.fc) 331 return nil, err 332 } 333 return pw.results, nil 334} 335 336// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. 337// 338// The receive processor only deals with a single instance of a connection/channel, and thus should never interact 339// with the mutex lock. 340func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) { 341 // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply 342 // ensure that pending writes get acknowledged with a terminal state. 343 for { 344 select { 345 case <-ctx.Done(): 346 // Context is done, so we're not going to get further updates. Mark all work failed with the context error. 347 for { 348 pw, ok := <-ch 349 if !ok { 350 return 351 } 352 pw.markDone(NoStreamOffset, ctx.Err(), fc) 353 } 354 case nextWrite, ok := <-ch: 355 if !ok { 356 // Channel closed, all elements processed. 357 return 358 } 359 360 // block until we get a corresponding response or err from stream. 361 resp, err := arc.Recv() 362 if err != nil { 363 nextWrite.markDone(NoStreamOffset, err, fc) 364 continue 365 } 366 recordStat(ctx, AppendResponses, 1) 367 368 if status := resp.GetError(); status != nil { 369 nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc) 370 continue 371 } 372 success := resp.GetAppendResult() 373 off := success.GetOffset() 374 if off != nil { 375 nextWrite.markDone(off.GetValue(), nil, fc) 376 } else { 377 nextWrite.markDone(NoStreamOffset, nil, fc) 378 } 379 } 380 } 381} 382