1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package managedwriter 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "sync" 22 23 "github.com/googleapis/gax-go/v2" 24 "go.opencensus.io/tag" 25 storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" 26 "google.golang.org/grpc/codes" 27 grpcstatus "google.golang.org/grpc/status" 28 "google.golang.org/protobuf/types/descriptorpb" 29 "google.golang.org/protobuf/types/known/wrapperspb" 30) 31 32// StreamType indicates the type of stream this write client is managing. 33type StreamType string 34 35var ( 36 // DefaultStream most closely mimics the legacy bigquery 37 // tabledata.insertAll semantics. Successful inserts are 38 // committed immediately, and there's no tracking offsets as 39 // all writes go into a "default" stream that always exists 40 // for a table. 41 DefaultStream StreamType = "DEFAULT" 42 43 // CommittedStream appends data immediately, but creates a 44 // discrete stream for the work so that offset tracking can 45 // be used to track writes. 46 CommittedStream StreamType = "COMMITTED" 47 48 // BufferedStream is a form of checkpointed stream, that allows 49 // you to advance the offset of visible rows via Flush operations. 50 // 51 // NOTE: Buffered Streams are currently in limited preview, and as such 52 // methods like FlushRows() may yield errors for non-enrolled projects. 53 BufferedStream StreamType = "BUFFERED" 54 55 // PendingStream is a stream in which no data is made visible to 56 // readers until the stream is finalized and committed explicitly. 57 PendingStream StreamType = "PENDING" 58) 59 60func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { 61 switch t { 62 case CommittedStream: 63 return storagepb.WriteStream_COMMITTED 64 case PendingStream: 65 return storagepb.WriteStream_PENDING 66 case BufferedStream: 67 return storagepb.WriteStream_BUFFERED 68 default: 69 return storagepb.WriteStream_TYPE_UNSPECIFIED 70 } 71} 72 73// ManagedStream is the abstraction over a single write stream. 74type ManagedStream struct { 75 streamSettings *streamSettings 76 schemaDescriptor *descriptorpb.DescriptorProto 77 destinationTable string 78 c *Client 79 fc *flowController 80 81 // aspects of the stream client 82 ctx context.Context // retained context for the stream 83 cancel context.CancelFunc 84 open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection 85 86 mu sync.Mutex 87 arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection 88 err error // terminal error 89 pending chan *pendingWrite // writes awaiting status 90 streamSetup *sync.Once // handles amending the first request in a new stream 91} 92 93// enables testing 94type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) 95 96// streamSettings govern behavior of the append stream RPCs. 97type streamSettings struct { 98 99 // streamID contains the reference to the destination stream. 100 streamID string 101 102 // streamType governs behavior of the client, such as how 103 // offset handling is managed. 104 streamType StreamType 105 106 // MaxInflightRequests governs how many unacknowledged 107 // append writes can be outstanding into the system. 108 MaxInflightRequests int 109 110 // MaxInflightBytes governs how many unacknowledged 111 // request bytes can be outstanding into the system. 112 MaxInflightBytes int 113 114 // TraceID can be set when appending data on a stream. It's 115 // purpose is to aid in debug and diagnostic scenarios. 116 TraceID string 117 118 // dataOrigin can be set for classifying metrics generated 119 // by a stream. 120 dataOrigin string 121} 122 123func defaultStreamSettings() *streamSettings { 124 return &streamSettings{ 125 streamType: DefaultStream, 126 MaxInflightRequests: 1000, 127 MaxInflightBytes: 0, 128 TraceID: "", 129 } 130} 131 132// StreamName returns the corresponding write stream ID being managed by this writer. 133func (ms *ManagedStream) StreamName() string { 134 return ms.streamSettings.streamID 135} 136 137// StreamType returns the configured type for this stream. 138func (ms *ManagedStream) StreamType() StreamType { 139 return ms.streamSettings.streamType 140} 141 142// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling 143// this method for other stream types yields an error. 144func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) { 145 req := &storagepb.FlushRowsRequest{ 146 WriteStream: ms.streamSettings.streamID, 147 Offset: &wrapperspb.Int64Value{ 148 Value: offset, 149 }, 150 } 151 resp, err := ms.c.rawClient.FlushRows(ctx, req) 152 recordStat(ms.ctx, FlushRequests, 1) 153 if err != nil { 154 return 0, err 155 } 156 return resp.GetOffset(), nil 157} 158 159// Finalize is used to mark a stream as complete, and thus ensure no further data can 160// be appended to the stream. You cannot finalize a DefaultStream, as it always exists. 161// 162// Finalizing does not advance the current offset of a BufferedStream, nor does it commit 163// data in a PendingStream. 164func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { 165 // TODO: consider blocking for in-flight appends once we have an appendStream plumbed in. 166 req := &storagepb.FinalizeWriteStreamRequest{ 167 Name: ms.streamSettings.streamID, 168 } 169 resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req) 170 if err != nil { 171 return 0, err 172 } 173 return resp.GetRowCount(), nil 174} 175 176// getStream returns either a valid ARC client stream or permanent error. 177// 178// Calling getStream locks the mutex. 179func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { 180 ms.mu.Lock() 181 defer ms.mu.Unlock() 182 if ms.err != nil { 183 return nil, nil, ms.err 184 } 185 ms.err = ms.ctx.Err() 186 if ms.err != nil { 187 return nil, nil, ms.err 188 } 189 190 // Always return the retained ARC if the arg differs. 191 if arc != ms.arc { 192 return ms.arc, ms.pending, nil 193 } 194 195 ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) 196 *ms.arc, ms.pending, ms.err = ms.openWithRetry() 197 return ms.arc, ms.pending, ms.err 198} 199 200// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. 201// 202// Only getStream() should call this, and thus the calling code has the mutex lock. 203func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { 204 r := defaultRetryer{} 205 for { 206 recordStat(ms.ctx, AppendClientOpenCount, 1) 207 streamID := "" 208 if ms.streamSettings != nil { 209 streamID = ms.streamSettings.streamID 210 } 211 arc, err := ms.open(streamID) 212 bo, shouldRetry := r.Retry(err) 213 if err != nil && shouldRetry { 214 recordStat(ms.ctx, AppendClientOpenRetryCount, 1) 215 if err := gax.Sleep(ms.ctx, bo); err != nil { 216 return nil, nil, err 217 } 218 continue 219 } 220 if err == nil { 221 // The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan 222 // and fire up the associated receive processor. 223 ch := make(chan *pendingWrite) 224 go recvProcessor(ms.ctx, arc, ms.fc, ch) 225 // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work 226 // for every new connection. 227 ms.streamSetup = new(sync.Once) 228 return arc, ch, nil 229 } 230 return arc, nil, err 231 } 232} 233 234func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error { 235 var settings gax.CallSettings 236 for _, opt := range opts { 237 opt.Resolve(&settings) 238 } 239 var r gax.Retryer = &defaultRetryer{} 240 if settings.Retry != nil { 241 r = settings.Retry() 242 } 243 244 var arc *storagepb.BigQueryWrite_AppendRowsClient 245 var ch chan *pendingWrite 246 var err error 247 248 for { 249 arc, ch, err = ms.getStream(arc) 250 if err != nil { 251 return err 252 } 253 var req *storagepb.AppendRowsRequest 254 ms.streamSetup.Do(func() { 255 reqCopy := *pw.request 256 reqCopy.WriteStream = ms.streamSettings.streamID 257 reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ 258 ProtoDescriptor: ms.schemaDescriptor, 259 } 260 if ms.streamSettings.TraceID != "" { 261 reqCopy.TraceId = ms.streamSettings.TraceID 262 } 263 req = &reqCopy 264 }) 265 266 var err error 267 if req == nil { 268 err = (*arc).Send(pw.request) 269 } else { 270 // we had to amend the initial request 271 err = (*arc).Send(req) 272 } 273 recordStat(ms.ctx, AppendRequests, 1) 274 recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) 275 recordStat(ms.ctx, AppendRequestRows, int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))) 276 if err != nil { 277 status := grpcstatus.Convert(err) 278 if status != nil { 279 ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) 280 recordStat(ctx, AppendRequestErrors, 1) 281 } 282 bo, shouldRetry := r.Retry(err) 283 if shouldRetry { 284 if err := gax.Sleep(ms.ctx, bo); err != nil { 285 return err 286 } 287 continue 288 } 289 ms.mu.Lock() 290 ms.err = err 291 ms.mu.Unlock() 292 } 293 if err == nil { 294 ch <- pw 295 } 296 return err 297 } 298} 299 300// Close closes a managed stream. 301func (ms *ManagedStream) Close() error { 302 303 var arc *storagepb.BigQueryWrite_AppendRowsClient 304 305 arc, ch, err := ms.getStream(arc) 306 if err != nil { 307 return err 308 } 309 if ms.arc == nil { 310 return fmt.Errorf("no stream exists") 311 } 312 err = (*arc).CloseSend() 313 if err == nil { 314 close(ch) 315 } 316 ms.mu.Lock() 317 ms.err = io.EOF 318 ms.mu.Unlock() 319 // Propagate cancellation. 320 if ms.cancel != nil { 321 ms.cancel() 322 } 323 return err 324} 325 326// AppendRows sends the append requests to the service, and returns one AppendResult per row. 327// The format of the row data is binary serialized protocol buffer bytes, and and the message 328// must adhere to the format of the schema Descriptor passed in when creating the managed stream. 329func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) { 330 pw := newPendingWrite(data, offset) 331 // check flow control 332 if err := ms.fc.acquire(ctx, pw.reqSize); err != nil { 333 // in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release. 334 pw.markDone(NoStreamOffset, err, nil) 335 } 336 // proceed to call 337 if err := ms.append(pw); err != nil { 338 // pending write is DOA. 339 pw.markDone(NoStreamOffset, err, ms.fc) 340 return nil, err 341 } 342 return pw.results, nil 343} 344 345// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. 346// 347// The receive processor only deals with a single instance of a connection/channel, and thus should never interact 348// with the mutex lock. 349func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) { 350 // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply 351 // ensure that pending writes get acknowledged with a terminal state. 352 for { 353 select { 354 case <-ctx.Done(): 355 // Context is done, so we're not going to get further updates. Mark all work failed with the context error. 356 for { 357 pw, ok := <-ch 358 if !ok { 359 return 360 } 361 pw.markDone(NoStreamOffset, ctx.Err(), fc) 362 } 363 case nextWrite, ok := <-ch: 364 if !ok { 365 // Channel closed, all elements processed. 366 return 367 } 368 369 // block until we get a corresponding response or err from stream. 370 resp, err := arc.Recv() 371 if err != nil { 372 nextWrite.markDone(NoStreamOffset, err, fc) 373 continue 374 } 375 recordStat(ctx, AppendResponses, 1) 376 377 if status := resp.GetError(); status != nil { 378 tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())) 379 if err != nil { 380 tagCtx = ctx 381 } 382 recordStat(tagCtx, AppendResponseErrors, 1) 383 nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc) 384 continue 385 } 386 success := resp.GetAppendResult() 387 off := success.GetOffset() 388 if off != nil { 389 nextWrite.markDone(off.GetValue(), nil, fc) 390 } else { 391 nextWrite.markDone(NoStreamOffset, nil, fc) 392 } 393 } 394 } 395} 396