1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package managedwriter
16
17import (
18	"context"
19	"fmt"
20	"math"
21	"testing"
22	"time"
23
24	"cloud.google.com/go/bigquery"
25	"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
26	"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
27	"cloud.google.com/go/internal/testutil"
28	"cloud.google.com/go/internal/uid"
29	"go.opencensus.io/stats/view"
30	"google.golang.org/api/option"
31	"google.golang.org/protobuf/encoding/protojson"
32	"google.golang.org/protobuf/proto"
33	"google.golang.org/protobuf/reflect/protodesc"
34	"google.golang.org/protobuf/reflect/protoreflect"
35	"google.golang.org/protobuf/types/descriptorpb"
36	"google.golang.org/protobuf/types/dynamicpb"
37	"google.golang.org/protobuf/types/known/wrapperspb"
38)
39
40var (
41	datasetIDs         = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
42	tableIDs           = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
43	defaultTestTimeout = 30 * time.Second
44)
45
46// our test data has cardinality 5 for names, 3 for values
47var testSimpleData = []*testdata.SimpleMessageProto2{
48	{Name: proto.String("one"), Value: proto.Int64(1)},
49	{Name: proto.String("two"), Value: proto.Int64(2)},
50	{Name: proto.String("three"), Value: proto.Int64(3)},
51	{Name: proto.String("four"), Value: proto.Int64(1)},
52	{Name: proto.String("five"), Value: proto.Int64(2)},
53}
54
55func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
56	if testing.Short() {
57		t.Skip("Integration tests skipped in short mode")
58	}
59	projID := testutil.ProjID()
60	if projID == "" {
61		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
62	}
63	ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
64	if ts == nil {
65		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
66	}
67	opts = append(opts, option.WithTokenSource(ts))
68	client, err := NewClient(ctx, projID, opts...)
69	if err != nil {
70		t.Fatalf("couldn't create managedwriter client: %v", err)
71	}
72
73	bqClient, err := bigquery.NewClient(ctx, projID, opts...)
74	if err != nil {
75		t.Fatalf("couldn't create bigquery client: %v", err)
76	}
77	return client, bqClient
78}
79
80// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
81func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) {
82	dataset := bqc.Dataset(datasetIDs.New())
83	if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil {
84		return nil, nil, err
85	}
86	return dataset, func() {
87		if err := dataset.DeleteWithContents(ctx); err != nil {
88			t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err)
89		}
90	}, nil
91}
92
93// setupDynamicDescriptors aids testing when not using a supplied proto
94func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
95	convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
96	if err != nil {
97		t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
98	}
99
100	descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
101	if err != nil {
102		t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
103	}
104	messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
105	if !ok {
106		t.Fatalf("adapted descriptor is not a message descriptor")
107	}
108	return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
109}
110
111func TestIntegration_ManagedWriter(t *testing.T) {
112	mwClient, bqClient := getTestClients(context.Background(), t)
113	defer mwClient.Close()
114	defer bqClient.Close()
115
116	dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
117	if err != nil {
118		t.Fatalf("failed to init test dataset: %v", err)
119	}
120	defer cleanup()
121
122	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
123	defer cancel()
124
125	t.Run("group", func(t *testing.T) {
126		t.Run("DefaultStream", func(t *testing.T) {
127			t.Parallel()
128			testDefaultStream(ctx, t, mwClient, bqClient, dataset)
129		})
130		t.Run("DefaultStreamDynamicJSON", func(t *testing.T) {
131			t.Parallel()
132			testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
133		})
134		t.Run("CommittedStream", func(t *testing.T) {
135			t.Parallel()
136			testCommittedStream(ctx, t, mwClient, bqClient, dataset)
137		})
138		t.Run("BufferedStream", func(t *testing.T) {
139			t.Parallel()
140			testBufferedStream(ctx, t, mwClient, bqClient, dataset)
141		})
142		t.Run("PendingStream", func(t *testing.T) {
143			t.Parallel()
144			testPendingStream(ctx, t, mwClient, bqClient, dataset)
145		})
146		t.Run("Instrumentation", func(t *testing.T) {
147			// Don't run this in parallel, we only want to collect stats from this subtest.
148			testInstrumentation(ctx, t, mwClient, bqClient, dataset)
149		})
150	})
151}
152
153func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
154	testTable := dataset.Table(tableIDs.New())
155	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
156		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
157	}
158	// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
159	// to send as the stream's schema.
160	m := &testdata.SimpleMessageProto2{}
161	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
162
163	// setup a new stream.
164	ms, err := mwClient.NewManagedStream(ctx,
165		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
166		WithType(DefaultStream),
167		WithSchemaDescriptor(descriptorProto),
168	)
169	if err != nil {
170		t.Fatalf("NewManagedStream: %v", err)
171	}
172	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
173		withExactRowCount(0))
174
175	// First, send the test rows individually.
176	var result *AppendResult
177	for k, mesg := range testSimpleData {
178		b, err := proto.Marshal(mesg)
179		if err != nil {
180			t.Errorf("failed to marshal message %d: %v", k, err)
181		}
182		data := [][]byte{b}
183		result, err = ms.AppendRows(ctx, data, NoStreamOffset)
184		if err != nil {
185			t.Errorf("single-row append %d failed: %v", k, err)
186		}
187	}
188	// wait for the result to indicate ready, then validate.
189	result.Ready()
190	validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
191		withExactRowCount(int64(len(testSimpleData))),
192		withDistinctValues("name", int64(len(testSimpleData))))
193
194	// Now, send the test rows grouped into in a single append.
195	var data [][]byte
196	for k, mesg := range testSimpleData {
197		b, err := proto.Marshal(mesg)
198		if err != nil {
199			t.Errorf("failed to marshal message %d: %v", k, err)
200		}
201		data = append(data, b)
202	}
203	result, err = ms.AppendRows(ctx, data, NoStreamOffset)
204	if err != nil {
205		t.Errorf("grouped-row append failed: %v", err)
206	}
207	// wait for the result to indicate ready, then validate again.  Our total rows have increased, but
208	// cardinality should not.
209	result.Ready()
210	validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
211		withExactRowCount(int64(2*len(testSimpleData))),
212		withDistinctValues("name", int64(len(testSimpleData))),
213		withDistinctValues("value", int64(3)),
214	)
215}
216
217func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
218	testTable := dataset.Table(tableIDs.New())
219	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
220		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
221	}
222
223	md, descriptorProto := setupDynamicDescriptors(t, testdata.SimpleMessageSchema)
224
225	ms, err := mwClient.NewManagedStream(ctx,
226		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
227		WithType(DefaultStream),
228		WithSchemaDescriptor(descriptorProto),
229	)
230	if err != nil {
231		t.Fatalf("NewManagedStream: %v", err)
232	}
233	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
234		withExactRowCount(0))
235
236	sampleJSONData := [][]byte{
237		[]byte(`{"name": "one", "value": 1}`),
238		[]byte(`{"name": "two", "value": 2}`),
239		[]byte(`{"name": "three", "value": 3}`),
240		[]byte(`{"name": "four", "value": 4}`),
241		[]byte(`{"name": "five", "value": 5}`),
242	}
243
244	var result *AppendResult
245	for k, v := range sampleJSONData {
246		message := dynamicpb.NewMessage(md)
247
248		// First, json->proto message
249		err = protojson.Unmarshal(v, message)
250		if err != nil {
251			t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
252		}
253		// Then, proto message -> bytes.
254		b, err := proto.Marshal(message)
255		if err != nil {
256			t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
257		}
258		result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
259		if err != nil {
260			t.Errorf("single-row append %d failed: %v", k, err)
261		}
262	}
263
264	// wait for the result to indicate ready, then validate.
265	result.Ready()
266	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
267		withExactRowCount(int64(len(sampleJSONData))),
268		withDistinctValues("name", int64(len(sampleJSONData))),
269		withDistinctValues("value", int64(len(sampleJSONData))))
270}
271
272func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
273	testTable := dataset.Table(tableIDs.New())
274	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
275		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
276	}
277
278	m := &testdata.SimpleMessageProto2{}
279	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
280
281	ms, err := mwClient.NewManagedStream(ctx,
282		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
283		WithType(BufferedStream),
284		WithSchemaDescriptor(descriptorProto),
285	)
286	if err != nil {
287		t.Fatalf("NewManagedStream: %v", err)
288	}
289
290	info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID)
291	if err != nil {
292		t.Errorf("couldn't get stream info: %v", err)
293	}
294	if info.GetType().String() != string(ms.StreamType()) {
295		t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
296	}
297	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
298		withExactRowCount(0))
299
300	var expectedRows int64
301	for k, mesg := range testSimpleData {
302		b, err := proto.Marshal(mesg)
303		if err != nil {
304			t.Errorf("failed to marshal message %d: %v", k, err)
305		}
306		data := [][]byte{b}
307		results, err := ms.AppendRows(ctx, data, NoStreamOffset)
308		if err != nil {
309			t.Errorf("single-row append %d failed: %v", k, err)
310		}
311		// wait for ack
312		offset, err := results.GetResult(ctx)
313		if err != nil {
314			t.Errorf("got error from pending result %d: %v", k, err)
315		}
316		validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k),
317			withExactRowCount(expectedRows),
318			withDistinctValues("name", expectedRows))
319
320		// move offset and re-validate.
321		flushOffset, err := ms.FlushRows(ctx, offset)
322		if err != nil {
323			t.Errorf("failed to flush offset to %d: %v", offset, err)
324		}
325		expectedRows = flushOffset + 1
326		validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k),
327			withExactRowCount(expectedRows),
328			withDistinctValues("name", expectedRows))
329	}
330}
331
332func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
333	testTable := dataset.Table(tableIDs.New())
334	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
335		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
336	}
337
338	m := &testdata.SimpleMessageProto2{}
339	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
340
341	// setup a new stream.
342	ms, err := mwClient.NewManagedStream(ctx,
343		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
344		WithType(CommittedStream),
345		WithSchemaDescriptor(descriptorProto),
346	)
347	if err != nil {
348		t.Fatalf("NewManagedStream: %v", err)
349	}
350	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
351		withExactRowCount(0))
352
353	var result *AppendResult
354	for k, mesg := range testSimpleData {
355		b, err := proto.Marshal(mesg)
356		if err != nil {
357			t.Errorf("failed to marshal message %d: %v", k, err)
358		}
359		data := [][]byte{b}
360		result, err = ms.AppendRows(ctx, data, NoStreamOffset)
361		if err != nil {
362			t.Errorf("single-row append %d failed: %v", k, err)
363		}
364	}
365	// wait for the result to indicate ready, then validate.
366	result.Ready()
367	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
368		withExactRowCount(int64(len(testSimpleData))))
369}
370
371func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
372	testTable := dataset.Table(tableIDs.New())
373	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
374		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
375	}
376
377	m := &testdata.SimpleMessageProto2{}
378	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
379
380	ms, err := mwClient.NewManagedStream(ctx,
381		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
382		WithType(PendingStream),
383		WithSchemaDescriptor(descriptorProto),
384	)
385	if err != nil {
386		t.Fatalf("NewManagedStream: %v", err)
387	}
388	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
389		withExactRowCount(0))
390
391	// Send data.
392	var result *AppendResult
393	for k, mesg := range testSimpleData {
394		b, err := proto.Marshal(mesg)
395		if err != nil {
396			t.Errorf("failed to marshal message %d: %v", k, err)
397		}
398		data := [][]byte{b}
399		result, err = ms.AppendRows(ctx, data, NoStreamOffset)
400		if err != nil {
401			t.Errorf("single-row append %d failed: %v", k, err)
402		}
403	}
404	result.Ready()
405	wantRows := int64(len(testSimpleData))
406
407	// Mark stream complete.
408	trackedOffset, err := ms.Finalize(ctx)
409	if err != nil {
410		t.Errorf("Finalize: %v", err)
411	}
412
413	if trackedOffset != wantRows {
414		t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
415	}
416
417	// Commit stream and validate.
418	resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
419	if err != nil {
420		t.Errorf("client.BatchCommit: %v", err)
421	}
422	if len(resp.StreamErrors) > 0 {
423		t.Errorf("stream errors present: %v", resp.StreamErrors)
424	}
425	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
426		withExactRowCount(int64(len(testSimpleData))))
427}
428
429func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
430	testedViews := []*view.View{
431		AppendRequestsView,
432		AppendResponsesView,
433		AppendClientOpenView,
434	}
435
436	if err := view.Register(testedViews...); err != nil {
437		t.Fatalf("couldn't register views: %v", err)
438	}
439
440	testTable := dataset.Table(tableIDs.New())
441	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
442		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
443	}
444
445	m := &testdata.SimpleMessageProto2{}
446	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
447
448	// setup a new stream.
449	ms, err := mwClient.NewManagedStream(ctx,
450		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
451		WithType(DefaultStream),
452		WithSchemaDescriptor(descriptorProto),
453	)
454	if err != nil {
455		t.Fatalf("NewManagedStream: %v", err)
456	}
457
458	var result *AppendResult
459	for k, mesg := range testSimpleData {
460		b, err := proto.Marshal(mesg)
461		if err != nil {
462			t.Errorf("failed to marshal message %d: %v", k, err)
463		}
464		data := [][]byte{b}
465		result, err = ms.AppendRows(ctx, data, NoStreamOffset)
466		if err != nil {
467			t.Errorf("single-row append %d failed: %v", k, err)
468		}
469	}
470	// wait for the result to indicate ready.
471	result.Ready()
472	// Ick.  Stats reporting can't force flushing, and there's a race here.  Sleep to give the recv goroutine a chance
473	// to report.
474	time.Sleep(time.Second)
475
476	for _, tv := range testedViews {
477		metricData, err := view.RetrieveData(tv.Name)
478		if err != nil {
479			t.Errorf("view %q RetrieveData: %v", tv.Name, err)
480		}
481		if len(metricData) > 1 {
482			t.Errorf("%q: only expected 1 row, got %d", tv.Name, len(metricData))
483		}
484		if len(metricData[0].Tags) != 1 {
485			t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags))
486		}
487		entry := metricData[0].Data
488		sum, ok := entry.(*view.SumData)
489		if !ok {
490			t.Errorf("unexpected metric type: %T", entry)
491		}
492		got := sum.Value
493		var want int64
494		switch tv {
495		case AppendRequestsView:
496			want = int64(len(testSimpleData))
497		case AppendResponsesView:
498			want = int64(len(testSimpleData))
499		case AppendClientOpenView:
500			want = 1
501		}
502
503		// float comparison; diff more than error bound is error
504		if math.Abs(got-float64(want)) > 0.1 {
505			t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want)
506		}
507	}
508}
509
510func TestIntegration_DetectProjectID(t *testing.T) {
511	ctx := context.Background()
512	testCreds := testutil.Credentials(ctx)
513	if testCreds == nil {
514		t.Skip("test credentials not present, skipping")
515	}
516
517	if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
518		t.Errorf("test NewClient: %v", err)
519	}
520
521	badTS := testutil.ErroringTokenSource{}
522
523	if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
524		t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
525	}
526}
527
528func TestIntegration_ProtoNormalization(t *testing.T) {
529	mwClient, bqClient := getTestClients(context.Background(), t)
530	defer mwClient.Close()
531	defer bqClient.Close()
532
533	dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
534	if err != nil {
535		t.Fatalf("failed to init test dataset: %v", err)
536	}
537	defer cleanup()
538
539	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
540	defer cancel()
541
542	t.Run("group", func(t *testing.T) {
543		t.Run("ComplexType", func(t *testing.T) {
544			t.Parallel()
545			schema := testdata.ComplexTypeSchema
546			mesg := &testdata.ComplexType{
547				NestedRepeatedType: []*testdata.NestedType{
548					{
549						InnerType: []*testdata.InnerType{
550							{Value: []string{"a", "b", "c"}},
551							{Value: []string{"x", "y", "z"}},
552						},
553					},
554				},
555				InnerType: &testdata.InnerType{
556					Value: []string{"top"},
557				},
558			}
559			b, err := proto.Marshal(mesg)
560			if err != nil {
561				t.Fatalf("proto.Marshal: %v", err)
562			}
563			descriptor := (mesg).ProtoReflect().Descriptor()
564			testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
565		})
566		t.Run("WithWellKnownTypes", func(t *testing.T) {
567			t.Parallel()
568			schema := testdata.WithWellKnownTypesSchema
569			mesg := &testdata.WithWellKnownTypes{
570				Int64Value: proto.Int64(123),
571				WrappedInt64: &wrapperspb.Int64Value{
572					Value: 456,
573				},
574				StringValue: []string{"a", "b"},
575				WrappedString: []*wrapperspb.StringValue{
576					{Value: "foo"},
577					{Value: "bar"},
578				},
579			}
580			b, err := proto.Marshal(mesg)
581			if err != nil {
582				t.Fatalf("proto.Marshal: %v", err)
583			}
584			descriptor := (mesg).ProtoReflect().Descriptor()
585			testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
586		})
587	})
588}
589
590func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, schema bigquery.Schema, descriptor protoreflect.MessageDescriptor, sampleRow []byte) {
591	testTable := dataset.Table(tableIDs.New())
592	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
593		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
594	}
595
596	dp, err := adapt.NormalizeDescriptor(descriptor)
597	if err != nil {
598		t.Fatalf("NormalizeDescriptor: %v", err)
599	}
600
601	// setup a new stream.
602	ms, err := mwClient.NewManagedStream(ctx,
603		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
604		WithType(DefaultStream),
605		WithSchemaDescriptor(dp),
606	)
607	if err != nil {
608		t.Fatalf("NewManagedStream: %v", err)
609	}
610	result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
611	if err != nil {
612		t.Errorf("append failed: %v", err)
613	}
614
615	_, err = result.GetResult(ctx)
616	if err != nil {
617		t.Errorf("error in response: %v", err)
618	}
619}
620