1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package managedwriter 16 17import ( 18 "context" 19 "fmt" 20 "math" 21 "testing" 22 "time" 23 24 "cloud.google.com/go/bigquery" 25 "cloud.google.com/go/bigquery/storage/managedwriter/adapt" 26 "cloud.google.com/go/bigquery/storage/managedwriter/testdata" 27 "cloud.google.com/go/internal/testutil" 28 "cloud.google.com/go/internal/uid" 29 "go.opencensus.io/stats/view" 30 "google.golang.org/api/option" 31 "google.golang.org/protobuf/encoding/protojson" 32 "google.golang.org/protobuf/proto" 33 "google.golang.org/protobuf/reflect/protodesc" 34 "google.golang.org/protobuf/reflect/protoreflect" 35 "google.golang.org/protobuf/types/descriptorpb" 36 "google.golang.org/protobuf/types/dynamicpb" 37) 38 39var ( 40 datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) 41 tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) 42 defaultTestTimeout = 30 * time.Second 43) 44 45// our test data has cardinality 5 for names, 3 for values 46var testSimpleData = []*testdata.SimpleMessageProto2{ 47 {Name: proto.String("one"), Value: proto.Int64(1)}, 48 {Name: proto.String("two"), Value: proto.Int64(2)}, 49 {Name: proto.String("three"), Value: proto.Int64(3)}, 50 {Name: proto.String("four"), Value: proto.Int64(1)}, 51 {Name: proto.String("five"), Value: proto.Int64(2)}, 52} 53 54func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { 55 if testing.Short() { 56 t.Skip("Integration tests skipped in short mode") 57 } 58 projID := testutil.ProjID() 59 if projID == "" { 60 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") 61 } 62 ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery") 63 if ts == nil { 64 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") 65 } 66 opts = append(opts, option.WithTokenSource(ts)) 67 client, err := NewClient(ctx, projID, opts...) 68 if err != nil { 69 t.Fatalf("couldn't create managedwriter client: %v", err) 70 } 71 72 bqClient, err := bigquery.NewClient(ctx, projID, opts...) 73 if err != nil { 74 t.Fatalf("couldn't create bigquery client: %v", err) 75 } 76 return client, bqClient 77} 78 79// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred. 80func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) { 81 dataset := bqc.Dataset(datasetIDs.New()) 82 if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil { 83 return nil, nil, err 84 } 85 return dataset, func() { 86 if err := dataset.DeleteWithContents(ctx); err != nil { 87 t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err) 88 } 89 }, nil 90} 91 92// setupDynamicDescriptors aids testing when not using a supplied proto 93func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) { 94 convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema) 95 if err != nil { 96 t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err) 97 } 98 99 descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root") 100 if err != nil { 101 t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err) 102 } 103 messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) 104 if !ok { 105 t.Fatalf("adapted descriptor is not a message descriptor") 106 } 107 return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) 108} 109 110func TestIntegration_ManagedWriter(t *testing.T) { 111 mwClient, bqClient := getTestClients(context.Background(), t) 112 defer mwClient.Close() 113 defer bqClient.Close() 114 115 dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1") 116 if err != nil { 117 t.Fatalf("failed to init test dataset: %v", err) 118 } 119 defer cleanup() 120 121 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 122 defer cancel() 123 124 t.Run("group", func(t *testing.T) { 125 t.Run("DefaultStream", func(t *testing.T) { 126 t.Parallel() 127 testDefaultStream(ctx, t, mwClient, bqClient, dataset) 128 }) 129 t.Run("DefaultStreamDynamicJSON", func(t *testing.T) { 130 t.Parallel() 131 testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset) 132 }) 133 t.Run("CommittedStream", func(t *testing.T) { 134 t.Parallel() 135 testCommittedStream(ctx, t, mwClient, bqClient, dataset) 136 }) 137 t.Run("BufferedStream", func(t *testing.T) { 138 t.Parallel() 139 testBufferedStream(ctx, t, mwClient, bqClient, dataset) 140 }) 141 t.Run("PendingStream", func(t *testing.T) { 142 t.Parallel() 143 testPendingStream(ctx, t, mwClient, bqClient, dataset) 144 }) 145 t.Run("Instrumentation", func(t *testing.T) { 146 // Don't run this in parallel, we only want to collect stats from this subtest. 147 testInstrumentation(ctx, t, mwClient, bqClient, dataset) 148 }) 149 }) 150 151} 152 153func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 154 testTable := dataset.Table(tableIDs.New()) 155 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 156 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) 157 } 158 // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation 159 // to send as the stream's schema. 160 m := &testdata.SimpleMessageProto2{} 161 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 162 163 // setup a new stream. 164 ms, err := mwClient.NewManagedStream(ctx, 165 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 166 WithType(DefaultStream), 167 WithSchemaDescriptor(descriptorProto), 168 ) 169 if err != nil { 170 t.Fatalf("NewManagedStream: %v", err) 171 } 172 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 173 withExactRowCount(0)) 174 175 // First, send the test rows individually. 176 var results []*AppendResult 177 for k, mesg := range testSimpleData { 178 b, err := proto.Marshal(mesg) 179 if err != nil { 180 t.Errorf("failed to marshal message %d: %v", k, err) 181 } 182 data := [][]byte{b} 183 results, err = ms.AppendRows(ctx, data, NoStreamOffset) 184 if err != nil { 185 t.Errorf("single-row append %d failed: %v", k, err) 186 } 187 } 188 // wait for the result to indicate ready, then validate. 189 results[0].Ready() 190 validateTableConstraints(ctx, t, bqClient, testTable, "after first send round", 191 withExactRowCount(int64(len(testSimpleData))), 192 withDistinctValues("name", int64(len(testSimpleData)))) 193 194 // Now, send the test rows grouped into in a single append. 195 var data [][]byte 196 for k, mesg := range testSimpleData { 197 b, err := proto.Marshal(mesg) 198 if err != nil { 199 t.Errorf("failed to marshal message %d: %v", k, err) 200 } 201 data = append(data, b) 202 } 203 results, err = ms.AppendRows(ctx, data, NoStreamOffset) 204 if err != nil { 205 t.Errorf("grouped-row append failed: %v", err) 206 } 207 // wait for the result to indicate ready, then validate again. Our total rows have increased, but 208 // cardinality should not. 209 results[0].Ready() 210 validateTableConstraints(ctx, t, bqClient, testTable, "after second send round", 211 withExactRowCount(int64(2*len(testSimpleData))), 212 withDistinctValues("name", int64(len(testSimpleData))), 213 withDistinctValues("value", int64(3)), 214 ) 215} 216 217func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 218 testTable := dataset.Table(tableIDs.New()) 219 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 220 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 221 } 222 223 md, descriptorProto := setupDynamicDescriptors(t, testdata.SimpleMessageSchema) 224 225 ms, err := mwClient.NewManagedStream(ctx, 226 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 227 WithType(DefaultStream), 228 WithSchemaDescriptor(descriptorProto), 229 ) 230 if err != nil { 231 t.Fatalf("NewManagedStream: %v", err) 232 } 233 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 234 withExactRowCount(0)) 235 236 sampleJSONData := [][]byte{ 237 []byte(`{"name": "one", "value": 1}`), 238 []byte(`{"name": "two", "value": 2}`), 239 []byte(`{"name": "three", "value": 3}`), 240 []byte(`{"name": "four", "value": 4}`), 241 []byte(`{"name": "five", "value": 5}`), 242 } 243 244 var results []*AppendResult 245 for k, v := range sampleJSONData { 246 message := dynamicpb.NewMessage(md) 247 248 // First, json->proto message 249 err = protojson.Unmarshal(v, message) 250 if err != nil { 251 t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) 252 } 253 // Then, proto message -> bytes. 254 b, err := proto.Marshal(message) 255 if err != nil { 256 t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) 257 } 258 results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) 259 if err != nil { 260 t.Errorf("single-row append %d failed: %v", k, err) 261 } 262 } 263 264 // wait for the result to indicate ready, then validate. 265 results[0].Ready() 266 validateTableConstraints(ctx, t, bqClient, testTable, "after send", 267 withExactRowCount(int64(len(sampleJSONData))), 268 withDistinctValues("name", int64(len(sampleJSONData))), 269 withDistinctValues("value", int64(len(sampleJSONData)))) 270} 271 272func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 273 testTable := dataset.Table(tableIDs.New()) 274 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 275 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 276 } 277 278 m := &testdata.SimpleMessageProto2{} 279 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 280 281 ms, err := mwClient.NewManagedStream(ctx, 282 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 283 WithType(BufferedStream), 284 WithSchemaDescriptor(descriptorProto), 285 ) 286 if err != nil { 287 t.Fatalf("NewManagedStream: %v", err) 288 } 289 290 info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID) 291 if err != nil { 292 t.Errorf("couldn't get stream info: %v", err) 293 } 294 if info.GetType().String() != string(ms.StreamType()) { 295 t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) 296 } 297 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 298 withExactRowCount(0)) 299 300 var expectedRows int64 301 for k, mesg := range testSimpleData { 302 b, err := proto.Marshal(mesg) 303 if err != nil { 304 t.Errorf("failed to marshal message %d: %v", k, err) 305 } 306 data := [][]byte{b} 307 results, err := ms.AppendRows(ctx, data, NoStreamOffset) 308 if err != nil { 309 t.Errorf("single-row append %d failed: %v", k, err) 310 } 311 // wait for ack 312 offset, err := results[0].GetResult(ctx) 313 if err != nil { 314 t.Errorf("got error from pending result %d: %v", k, err) 315 } 316 validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k), 317 withExactRowCount(expectedRows), 318 withDistinctValues("name", expectedRows)) 319 320 // move offset and re-validate. 321 flushOffset, err := ms.FlushRows(ctx, offset) 322 if err != nil { 323 t.Errorf("failed to flush offset to %d: %v", offset, err) 324 } 325 expectedRows = flushOffset + 1 326 validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k), 327 withExactRowCount(expectedRows), 328 withDistinctValues("name", expectedRows)) 329 } 330} 331 332func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 333 testTable := dataset.Table(tableIDs.New()) 334 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 335 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 336 } 337 338 m := &testdata.SimpleMessageProto2{} 339 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 340 341 // setup a new stream. 342 ms, err := mwClient.NewManagedStream(ctx, 343 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 344 WithType(CommittedStream), 345 WithSchemaDescriptor(descriptorProto), 346 ) 347 if err != nil { 348 t.Fatalf("NewManagedStream: %v", err) 349 } 350 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 351 withExactRowCount(0)) 352 353 var results []*AppendResult 354 for k, mesg := range testSimpleData { 355 b, err := proto.Marshal(mesg) 356 if err != nil { 357 t.Errorf("failed to marshal message %d: %v", k, err) 358 } 359 data := [][]byte{b} 360 results, err = ms.AppendRows(ctx, data, NoStreamOffset) 361 if err != nil { 362 t.Errorf("single-row append %d failed: %v", k, err) 363 } 364 } 365 // wait for the result to indicate ready, then validate. 366 results[0].Ready() 367 validateTableConstraints(ctx, t, bqClient, testTable, "after send", 368 withExactRowCount(int64(len(testSimpleData)))) 369} 370 371func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 372 testTable := dataset.Table(tableIDs.New()) 373 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 374 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 375 } 376 377 m := &testdata.SimpleMessageProto2{} 378 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 379 380 ms, err := mwClient.NewManagedStream(ctx, 381 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 382 WithType(PendingStream), 383 WithSchemaDescriptor(descriptorProto), 384 ) 385 if err != nil { 386 t.Fatalf("NewManagedStream: %v", err) 387 } 388 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 389 withExactRowCount(0)) 390 391 // Send data. 392 var results []*AppendResult 393 for k, mesg := range testSimpleData { 394 b, err := proto.Marshal(mesg) 395 if err != nil { 396 t.Errorf("failed to marshal message %d: %v", k, err) 397 } 398 data := [][]byte{b} 399 results, err = ms.AppendRows(ctx, data, NoStreamOffset) 400 if err != nil { 401 t.Errorf("single-row append %d failed: %v", k, err) 402 } 403 } 404 results[0].Ready() 405 wantRows := int64(len(testSimpleData)) 406 407 // Mark stream complete. 408 trackedOffset, err := ms.Finalize(ctx) 409 if err != nil { 410 t.Errorf("Finalize: %v", err) 411 } 412 413 if trackedOffset != wantRows { 414 t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows) 415 } 416 417 // Commit stream and validate. 418 resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) 419 if err != nil { 420 t.Errorf("client.BatchCommit: %v", err) 421 } 422 if len(resp.StreamErrors) > 0 { 423 t.Errorf("stream errors present: %v", resp.StreamErrors) 424 } 425 validateTableConstraints(ctx, t, bqClient, testTable, "after send", 426 withExactRowCount(int64(len(testSimpleData)))) 427} 428 429func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 430 testedViews := []*view.View{ 431 AppendRequestsView, 432 AppendResponsesView, 433 AppendClientOpenView, 434 } 435 436 if err := view.Register(testedViews...); err != nil { 437 t.Fatalf("couldn't register views: %v", err) 438 } 439 440 testTable := dataset.Table(tableIDs.New()) 441 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 442 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) 443 } 444 445 m := &testdata.SimpleMessageProto2{} 446 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 447 448 // setup a new stream. 449 ms, err := mwClient.NewManagedStream(ctx, 450 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 451 WithType(DefaultStream), 452 WithSchemaDescriptor(descriptorProto), 453 ) 454 if err != nil { 455 t.Fatalf("NewManagedStream: %v", err) 456 } 457 458 var results []*AppendResult 459 for k, mesg := range testSimpleData { 460 b, err := proto.Marshal(mesg) 461 if err != nil { 462 t.Errorf("failed to marshal message %d: %v", k, err) 463 } 464 data := [][]byte{b} 465 results, err = ms.AppendRows(ctx, data, NoStreamOffset) 466 if err != nil { 467 t.Errorf("single-row append %d failed: %v", k, err) 468 } 469 } 470 // wait for the result to indicate ready. 471 results[0].Ready() 472 // Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance 473 // to report. 474 time.Sleep(time.Second) 475 476 for _, tv := range testedViews { 477 metricData, err := view.RetrieveData(tv.Name) 478 if err != nil { 479 t.Errorf("view %q RetrieveData: %v", tv.Name, err) 480 } 481 if len(metricData) > 1 { 482 t.Errorf("%q: only expected 1 row, got %d", tv.Name, len(metricData)) 483 } 484 if len(metricData[0].Tags) != 1 { 485 t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags)) 486 } 487 entry := metricData[0].Data 488 sum, ok := entry.(*view.SumData) 489 if !ok { 490 t.Errorf("unexpected metric type: %T", entry) 491 } 492 got := sum.Value 493 var want int64 494 switch tv { 495 case AppendRequestsView: 496 want = int64(len(testSimpleData)) 497 case AppendResponsesView: 498 want = int64(len(testSimpleData)) 499 case AppendClientOpenView: 500 want = 1 501 } 502 503 // float comparison; diff more than error bound is error 504 if math.Abs(got-float64(want)) > 0.1 { 505 t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want) 506 } 507 } 508} 509 510func TestIntegration_DetectProjectID(t *testing.T) { 511 ctx := context.Background() 512 testCreds := testutil.Credentials(ctx) 513 if testCreds == nil { 514 t.Skip("test credentials not present, skipping") 515 } 516 517 if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil { 518 t.Errorf("test NewClient: %v", err) 519 } 520 521 badTS := testutil.ErroringTokenSource{} 522 523 if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil { 524 t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID) 525 } 526} 527