1// Copyright 2021 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package managedwriter 16 17import ( 18 "context" 19 "fmt" 20 "math" 21 "testing" 22 "time" 23 24 "cloud.google.com/go/bigquery" 25 "cloud.google.com/go/bigquery/storage/managedwriter/adapt" 26 "cloud.google.com/go/bigquery/storage/managedwriter/testdata" 27 "cloud.google.com/go/internal/testutil" 28 "cloud.google.com/go/internal/uid" 29 "go.opencensus.io/stats/view" 30 "google.golang.org/api/option" 31 "google.golang.org/protobuf/encoding/protojson" 32 "google.golang.org/protobuf/proto" 33 "google.golang.org/protobuf/reflect/protodesc" 34 "google.golang.org/protobuf/reflect/protoreflect" 35 "google.golang.org/protobuf/types/descriptorpb" 36 "google.golang.org/protobuf/types/dynamicpb" 37 "google.golang.org/protobuf/types/known/wrapperspb" 38) 39 40var ( 41 datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) 42 tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) 43 defaultTestTimeout = 30 * time.Second 44) 45 46// our test data has cardinality 5 for names, 3 for values 47var testSimpleData = []*testdata.SimpleMessageProto2{ 48 {Name: proto.String("one"), Value: proto.Int64(1)}, 49 {Name: proto.String("two"), Value: proto.Int64(2)}, 50 {Name: proto.String("three"), Value: proto.Int64(3)}, 51 {Name: proto.String("four"), Value: proto.Int64(1)}, 52 {Name: proto.String("five"), Value: proto.Int64(2)}, 53} 54 55func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { 56 if testing.Short() { 57 t.Skip("Integration tests skipped in short mode") 58 } 59 projID := testutil.ProjID() 60 if projID == "" { 61 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") 62 } 63 ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery") 64 if ts == nil { 65 t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") 66 } 67 opts = append(opts, option.WithTokenSource(ts)) 68 client, err := NewClient(ctx, projID, opts...) 69 if err != nil { 70 t.Fatalf("couldn't create managedwriter client: %v", err) 71 } 72 73 bqClient, err := bigquery.NewClient(ctx, projID, opts...) 74 if err != nil { 75 t.Fatalf("couldn't create bigquery client: %v", err) 76 } 77 return client, bqClient 78} 79 80// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred. 81func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) { 82 dataset := bqc.Dataset(datasetIDs.New()) 83 if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil { 84 return nil, nil, err 85 } 86 return dataset, func() { 87 if err := dataset.DeleteWithContents(ctx); err != nil { 88 t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err) 89 } 90 }, nil 91} 92 93// setupDynamicDescriptors aids testing when not using a supplied proto 94func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) { 95 convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema) 96 if err != nil { 97 t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err) 98 } 99 100 descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root") 101 if err != nil { 102 t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err) 103 } 104 messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) 105 if !ok { 106 t.Fatalf("adapted descriptor is not a message descriptor") 107 } 108 return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) 109} 110 111func TestIntegration_ManagedWriter(t *testing.T) { 112 mwClient, bqClient := getTestClients(context.Background(), t) 113 defer mwClient.Close() 114 defer bqClient.Close() 115 116 dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1") 117 if err != nil { 118 t.Fatalf("failed to init test dataset: %v", err) 119 } 120 defer cleanup() 121 122 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 123 defer cancel() 124 125 t.Run("group", func(t *testing.T) { 126 t.Run("DefaultStream", func(t *testing.T) { 127 t.Parallel() 128 testDefaultStream(ctx, t, mwClient, bqClient, dataset) 129 }) 130 t.Run("DefaultStreamDynamicJSON", func(t *testing.T) { 131 t.Parallel() 132 testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset) 133 }) 134 t.Run("CommittedStream", func(t *testing.T) { 135 t.Parallel() 136 testCommittedStream(ctx, t, mwClient, bqClient, dataset) 137 }) 138 t.Run("BufferedStream", func(t *testing.T) { 139 t.Parallel() 140 testBufferedStream(ctx, t, mwClient, bqClient, dataset) 141 }) 142 t.Run("PendingStream", func(t *testing.T) { 143 t.Parallel() 144 testPendingStream(ctx, t, mwClient, bqClient, dataset) 145 }) 146 t.Run("Instrumentation", func(t *testing.T) { 147 // Don't run this in parallel, we only want to collect stats from this subtest. 148 testInstrumentation(ctx, t, mwClient, bqClient, dataset) 149 }) 150 }) 151} 152 153func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 154 testTable := dataset.Table(tableIDs.New()) 155 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 156 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) 157 } 158 // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation 159 // to send as the stream's schema. 160 m := &testdata.SimpleMessageProto2{} 161 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 162 163 // setup a new stream. 164 ms, err := mwClient.NewManagedStream(ctx, 165 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 166 WithType(DefaultStream), 167 WithSchemaDescriptor(descriptorProto), 168 ) 169 if err != nil { 170 t.Fatalf("NewManagedStream: %v", err) 171 } 172 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 173 withExactRowCount(0)) 174 175 // First, send the test rows individually. 176 var result *AppendResult 177 for k, mesg := range testSimpleData { 178 b, err := proto.Marshal(mesg) 179 if err != nil { 180 t.Errorf("failed to marshal message %d: %v", k, err) 181 } 182 data := [][]byte{b} 183 result, err = ms.AppendRows(ctx, data, NoStreamOffset) 184 if err != nil { 185 t.Errorf("single-row append %d failed: %v", k, err) 186 } 187 } 188 // wait for the result to indicate ready, then validate. 189 result.Ready() 190 validateTableConstraints(ctx, t, bqClient, testTable, "after first send round", 191 withExactRowCount(int64(len(testSimpleData))), 192 withDistinctValues("name", int64(len(testSimpleData)))) 193 194 // Now, send the test rows grouped into in a single append. 195 var data [][]byte 196 for k, mesg := range testSimpleData { 197 b, err := proto.Marshal(mesg) 198 if err != nil { 199 t.Errorf("failed to marshal message %d: %v", k, err) 200 } 201 data = append(data, b) 202 } 203 result, err = ms.AppendRows(ctx, data, NoStreamOffset) 204 if err != nil { 205 t.Errorf("grouped-row append failed: %v", err) 206 } 207 // wait for the result to indicate ready, then validate again. Our total rows have increased, but 208 // cardinality should not. 209 result.Ready() 210 validateTableConstraints(ctx, t, bqClient, testTable, "after second send round", 211 withExactRowCount(int64(2*len(testSimpleData))), 212 withDistinctValues("name", int64(len(testSimpleData))), 213 withDistinctValues("value", int64(3)), 214 ) 215} 216 217func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 218 testTable := dataset.Table(tableIDs.New()) 219 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 220 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 221 } 222 223 md, descriptorProto := setupDynamicDescriptors(t, testdata.SimpleMessageSchema) 224 225 ms, err := mwClient.NewManagedStream(ctx, 226 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 227 WithType(DefaultStream), 228 WithSchemaDescriptor(descriptorProto), 229 ) 230 if err != nil { 231 t.Fatalf("NewManagedStream: %v", err) 232 } 233 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 234 withExactRowCount(0)) 235 236 sampleJSONData := [][]byte{ 237 []byte(`{"name": "one", "value": 1}`), 238 []byte(`{"name": "two", "value": 2}`), 239 []byte(`{"name": "three", "value": 3}`), 240 []byte(`{"name": "four", "value": 4}`), 241 []byte(`{"name": "five", "value": 5}`), 242 } 243 244 var result *AppendResult 245 for k, v := range sampleJSONData { 246 message := dynamicpb.NewMessage(md) 247 248 // First, json->proto message 249 err = protojson.Unmarshal(v, message) 250 if err != nil { 251 t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) 252 } 253 // Then, proto message -> bytes. 254 b, err := proto.Marshal(message) 255 if err != nil { 256 t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) 257 } 258 result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) 259 if err != nil { 260 t.Errorf("single-row append %d failed: %v", k, err) 261 } 262 } 263 264 // wait for the result to indicate ready, then validate. 265 result.Ready() 266 validateTableConstraints(ctx, t, bqClient, testTable, "after send", 267 withExactRowCount(int64(len(sampleJSONData))), 268 withDistinctValues("name", int64(len(sampleJSONData))), 269 withDistinctValues("value", int64(len(sampleJSONData)))) 270} 271 272func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 273 testTable := dataset.Table(tableIDs.New()) 274 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 275 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 276 } 277 278 m := &testdata.SimpleMessageProto2{} 279 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 280 281 ms, err := mwClient.NewManagedStream(ctx, 282 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 283 WithType(BufferedStream), 284 WithSchemaDescriptor(descriptorProto), 285 ) 286 if err != nil { 287 t.Fatalf("NewManagedStream: %v", err) 288 } 289 290 info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID) 291 if err != nil { 292 t.Errorf("couldn't get stream info: %v", err) 293 } 294 if info.GetType().String() != string(ms.StreamType()) { 295 t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) 296 } 297 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 298 withExactRowCount(0)) 299 300 var expectedRows int64 301 for k, mesg := range testSimpleData { 302 b, err := proto.Marshal(mesg) 303 if err != nil { 304 t.Errorf("failed to marshal message %d: %v", k, err) 305 } 306 data := [][]byte{b} 307 results, err := ms.AppendRows(ctx, data, NoStreamOffset) 308 if err != nil { 309 t.Errorf("single-row append %d failed: %v", k, err) 310 } 311 // wait for ack 312 offset, err := results.GetResult(ctx) 313 if err != nil { 314 t.Errorf("got error from pending result %d: %v", k, err) 315 } 316 validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k), 317 withExactRowCount(expectedRows), 318 withDistinctValues("name", expectedRows)) 319 320 // move offset and re-validate. 321 flushOffset, err := ms.FlushRows(ctx, offset) 322 if err != nil { 323 t.Errorf("failed to flush offset to %d: %v", offset, err) 324 } 325 expectedRows = flushOffset + 1 326 validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k), 327 withExactRowCount(expectedRows), 328 withDistinctValues("name", expectedRows)) 329 } 330} 331 332func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 333 testTable := dataset.Table(tableIDs.New()) 334 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 335 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 336 } 337 338 m := &testdata.SimpleMessageProto2{} 339 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 340 341 // setup a new stream. 342 ms, err := mwClient.NewManagedStream(ctx, 343 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 344 WithType(CommittedStream), 345 WithSchemaDescriptor(descriptorProto), 346 ) 347 if err != nil { 348 t.Fatalf("NewManagedStream: %v", err) 349 } 350 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 351 withExactRowCount(0)) 352 353 var result *AppendResult 354 for k, mesg := range testSimpleData { 355 b, err := proto.Marshal(mesg) 356 if err != nil { 357 t.Errorf("failed to marshal message %d: %v", k, err) 358 } 359 data := [][]byte{b} 360 result, err = ms.AppendRows(ctx, data, NoStreamOffset) 361 if err != nil { 362 t.Errorf("single-row append %d failed: %v", k, err) 363 } 364 } 365 // wait for the result to indicate ready, then validate. 366 result.Ready() 367 validateTableConstraints(ctx, t, bqClient, testTable, "after send", 368 withExactRowCount(int64(len(testSimpleData)))) 369} 370 371func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 372 testTable := dataset.Table(tableIDs.New()) 373 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 374 t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) 375 } 376 377 m := &testdata.SimpleMessageProto2{} 378 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 379 380 ms, err := mwClient.NewManagedStream(ctx, 381 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 382 WithType(PendingStream), 383 WithSchemaDescriptor(descriptorProto), 384 ) 385 if err != nil { 386 t.Fatalf("NewManagedStream: %v", err) 387 } 388 validateTableConstraints(ctx, t, bqClient, testTable, "before send", 389 withExactRowCount(0)) 390 391 // Send data. 392 var result *AppendResult 393 for k, mesg := range testSimpleData { 394 b, err := proto.Marshal(mesg) 395 if err != nil { 396 t.Errorf("failed to marshal message %d: %v", k, err) 397 } 398 data := [][]byte{b} 399 result, err = ms.AppendRows(ctx, data, NoStreamOffset) 400 if err != nil { 401 t.Errorf("single-row append %d failed: %v", k, err) 402 } 403 } 404 result.Ready() 405 wantRows := int64(len(testSimpleData)) 406 407 // Mark stream complete. 408 trackedOffset, err := ms.Finalize(ctx) 409 if err != nil { 410 t.Errorf("Finalize: %v", err) 411 } 412 413 if trackedOffset != wantRows { 414 t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows) 415 } 416 417 // Commit stream and validate. 418 resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) 419 if err != nil { 420 t.Errorf("client.BatchCommit: %v", err) 421 } 422 if len(resp.StreamErrors) > 0 { 423 t.Errorf("stream errors present: %v", resp.StreamErrors) 424 } 425 validateTableConstraints(ctx, t, bqClient, testTable, "after send", 426 withExactRowCount(int64(len(testSimpleData)))) 427} 428 429func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { 430 testedViews := []*view.View{ 431 AppendRequestsView, 432 AppendResponsesView, 433 AppendClientOpenView, 434 } 435 436 if err := view.Register(testedViews...); err != nil { 437 t.Fatalf("couldn't register views: %v", err) 438 } 439 440 testTable := dataset.Table(tableIDs.New()) 441 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { 442 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) 443 } 444 445 m := &testdata.SimpleMessageProto2{} 446 descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) 447 448 // setup a new stream. 449 ms, err := mwClient.NewManagedStream(ctx, 450 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 451 WithType(DefaultStream), 452 WithSchemaDescriptor(descriptorProto), 453 ) 454 if err != nil { 455 t.Fatalf("NewManagedStream: %v", err) 456 } 457 458 var result *AppendResult 459 for k, mesg := range testSimpleData { 460 b, err := proto.Marshal(mesg) 461 if err != nil { 462 t.Errorf("failed to marshal message %d: %v", k, err) 463 } 464 data := [][]byte{b} 465 result, err = ms.AppendRows(ctx, data, NoStreamOffset) 466 if err != nil { 467 t.Errorf("single-row append %d failed: %v", k, err) 468 } 469 } 470 // wait for the result to indicate ready. 471 result.Ready() 472 // Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance 473 // to report. 474 time.Sleep(time.Second) 475 476 for _, tv := range testedViews { 477 metricData, err := view.RetrieveData(tv.Name) 478 if err != nil { 479 t.Errorf("view %q RetrieveData: %v", tv.Name, err) 480 } 481 if len(metricData) > 1 { 482 t.Errorf("%q: only expected 1 row, got %d", tv.Name, len(metricData)) 483 } 484 if len(metricData[0].Tags) != 1 { 485 t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags)) 486 } 487 entry := metricData[0].Data 488 sum, ok := entry.(*view.SumData) 489 if !ok { 490 t.Errorf("unexpected metric type: %T", entry) 491 } 492 got := sum.Value 493 var want int64 494 switch tv { 495 case AppendRequestsView: 496 want = int64(len(testSimpleData)) 497 case AppendResponsesView: 498 want = int64(len(testSimpleData)) 499 case AppendClientOpenView: 500 want = 1 501 } 502 503 // float comparison; diff more than error bound is error 504 if math.Abs(got-float64(want)) > 0.1 { 505 t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want) 506 } 507 } 508} 509 510func TestIntegration_DetectProjectID(t *testing.T) { 511 ctx := context.Background() 512 testCreds := testutil.Credentials(ctx) 513 if testCreds == nil { 514 t.Skip("test credentials not present, skipping") 515 } 516 517 if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil { 518 t.Errorf("test NewClient: %v", err) 519 } 520 521 badTS := testutil.ErroringTokenSource{} 522 523 if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil { 524 t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID) 525 } 526} 527 528func TestIntegration_ProtoNormalization(t *testing.T) { 529 mwClient, bqClient := getTestClients(context.Background(), t) 530 defer mwClient.Close() 531 defer bqClient.Close() 532 533 dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1") 534 if err != nil { 535 t.Fatalf("failed to init test dataset: %v", err) 536 } 537 defer cleanup() 538 539 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 540 defer cancel() 541 542 t.Run("group", func(t *testing.T) { 543 t.Run("ComplexType", func(t *testing.T) { 544 t.Parallel() 545 schema := testdata.ComplexTypeSchema 546 mesg := &testdata.ComplexType{ 547 NestedRepeatedType: []*testdata.NestedType{ 548 { 549 InnerType: []*testdata.InnerType{ 550 {Value: []string{"a", "b", "c"}}, 551 {Value: []string{"x", "y", "z"}}, 552 }, 553 }, 554 }, 555 InnerType: &testdata.InnerType{ 556 Value: []string{"top"}, 557 }, 558 } 559 b, err := proto.Marshal(mesg) 560 if err != nil { 561 t.Fatalf("proto.Marshal: %v", err) 562 } 563 descriptor := (mesg).ProtoReflect().Descriptor() 564 testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b) 565 }) 566 t.Run("WithWellKnownTypes", func(t *testing.T) { 567 t.Parallel() 568 schema := testdata.WithWellKnownTypesSchema 569 mesg := &testdata.WithWellKnownTypes{ 570 Int64Value: proto.Int64(123), 571 WrappedInt64: &wrapperspb.Int64Value{ 572 Value: 456, 573 }, 574 StringValue: []string{"a", "b"}, 575 WrappedString: []*wrapperspb.StringValue{ 576 {Value: "foo"}, 577 {Value: "bar"}, 578 }, 579 } 580 b, err := proto.Marshal(mesg) 581 if err != nil { 582 t.Fatalf("proto.Marshal: %v", err) 583 } 584 descriptor := (mesg).ProtoReflect().Descriptor() 585 testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b) 586 }) 587 }) 588} 589 590func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, schema bigquery.Schema, descriptor protoreflect.MessageDescriptor, sampleRow []byte) { 591 testTable := dataset.Table(tableIDs.New()) 592 if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { 593 t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) 594 } 595 596 dp, err := adapt.NormalizeDescriptor(descriptor) 597 if err != nil { 598 t.Fatalf("NormalizeDescriptor: %v", err) 599 } 600 601 // setup a new stream. 602 ms, err := mwClient.NewManagedStream(ctx, 603 WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), 604 WithType(DefaultStream), 605 WithSchemaDescriptor(dp), 606 ) 607 if err != nil { 608 t.Fatalf("NewManagedStream: %v", err) 609 } 610 result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset) 611 if err != nil { 612 t.Errorf("append failed: %v", err) 613 } 614 615 _, err = result.GetResult(ctx) 616 if err != nil { 617 t.Errorf("error in response: %v", err) 618 } 619} 620