1// Copyright 2021 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package managedwriter
16
17import (
18	"context"
19	"fmt"
20	"math"
21	"testing"
22	"time"
23
24	"cloud.google.com/go/bigquery"
25	"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
26	"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
27	"cloud.google.com/go/internal/testutil"
28	"cloud.google.com/go/internal/uid"
29	"go.opencensus.io/stats/view"
30	"google.golang.org/api/option"
31	"google.golang.org/protobuf/encoding/protojson"
32	"google.golang.org/protobuf/proto"
33	"google.golang.org/protobuf/reflect/protodesc"
34	"google.golang.org/protobuf/reflect/protoreflect"
35	"google.golang.org/protobuf/types/descriptorpb"
36	"google.golang.org/protobuf/types/dynamicpb"
37)
38
39var (
40	datasetIDs         = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
41	tableIDs           = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
42	defaultTestTimeout = 30 * time.Second
43)
44
45// our test data has cardinality 5 for names, 3 for values
46var testSimpleData = []*testdata.SimpleMessageProto2{
47	{Name: proto.String("one"), Value: proto.Int64(1)},
48	{Name: proto.String("two"), Value: proto.Int64(2)},
49	{Name: proto.String("three"), Value: proto.Int64(3)},
50	{Name: proto.String("four"), Value: proto.Int64(1)},
51	{Name: proto.String("five"), Value: proto.Int64(2)},
52}
53
54func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
55	if testing.Short() {
56		t.Skip("Integration tests skipped in short mode")
57	}
58	projID := testutil.ProjID()
59	if projID == "" {
60		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
61	}
62	ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
63	if ts == nil {
64		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
65	}
66	opts = append(opts, option.WithTokenSource(ts))
67	client, err := NewClient(ctx, projID, opts...)
68	if err != nil {
69		t.Fatalf("couldn't create managedwriter client: %v", err)
70	}
71
72	bqClient, err := bigquery.NewClient(ctx, projID, opts...)
73	if err != nil {
74		t.Fatalf("couldn't create bigquery client: %v", err)
75	}
76	return client, bqClient
77}
78
79// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
80func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) {
81	dataset := bqc.Dataset(datasetIDs.New())
82	if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil {
83		return nil, nil, err
84	}
85	return dataset, func() {
86		if err := dataset.DeleteWithContents(ctx); err != nil {
87			t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err)
88		}
89	}, nil
90}
91
92// setupDynamicDescriptors aids testing when not using a supplied proto
93func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
94	convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
95	if err != nil {
96		t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
97	}
98
99	descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
100	if err != nil {
101		t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
102	}
103	messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
104	if !ok {
105		t.Fatalf("adapted descriptor is not a message descriptor")
106	}
107	return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
108}
109
110func TestIntegration_ManagedWriter(t *testing.T) {
111	mwClient, bqClient := getTestClients(context.Background(), t)
112	defer mwClient.Close()
113	defer bqClient.Close()
114
115	dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
116	if err != nil {
117		t.Fatalf("failed to init test dataset: %v", err)
118	}
119	defer cleanup()
120
121	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
122	defer cancel()
123
124	t.Run("group", func(t *testing.T) {
125		t.Run("DefaultStream", func(t *testing.T) {
126			t.Parallel()
127			testDefaultStream(ctx, t, mwClient, bqClient, dataset)
128		})
129		t.Run("DefaultStreamDynamicJSON", func(t *testing.T) {
130			t.Parallel()
131			testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
132		})
133		t.Run("CommittedStream", func(t *testing.T) {
134			t.Parallel()
135			testCommittedStream(ctx, t, mwClient, bqClient, dataset)
136		})
137		t.Run("BufferedStream", func(t *testing.T) {
138			t.Parallel()
139			testBufferedStream(ctx, t, mwClient, bqClient, dataset)
140		})
141		t.Run("PendingStream", func(t *testing.T) {
142			t.Parallel()
143			testPendingStream(ctx, t, mwClient, bqClient, dataset)
144		})
145		t.Run("Instrumentation", func(t *testing.T) {
146			// Don't run this in parallel, we only want to collect stats from this subtest.
147			testInstrumentation(ctx, t, mwClient, bqClient, dataset)
148		})
149	})
150
151}
152
153func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
154	testTable := dataset.Table(tableIDs.New())
155	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
156		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
157	}
158	// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
159	// to send as the stream's schema.
160	m := &testdata.SimpleMessageProto2{}
161	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
162
163	// setup a new stream.
164	ms, err := mwClient.NewManagedStream(ctx,
165		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
166		WithType(DefaultStream),
167		WithSchemaDescriptor(descriptorProto),
168	)
169	if err != nil {
170		t.Fatalf("NewManagedStream: %v", err)
171	}
172	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
173		withExactRowCount(0))
174
175	// First, send the test rows individually.
176	var results []*AppendResult
177	for k, mesg := range testSimpleData {
178		b, err := proto.Marshal(mesg)
179		if err != nil {
180			t.Errorf("failed to marshal message %d: %v", k, err)
181		}
182		data := [][]byte{b}
183		results, err = ms.AppendRows(ctx, data, NoStreamOffset)
184		if err != nil {
185			t.Errorf("single-row append %d failed: %v", k, err)
186		}
187	}
188	// wait for the result to indicate ready, then validate.
189	results[0].Ready()
190	validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
191		withExactRowCount(int64(len(testSimpleData))),
192		withDistinctValues("name", int64(len(testSimpleData))))
193
194	// Now, send the test rows grouped into in a single append.
195	var data [][]byte
196	for k, mesg := range testSimpleData {
197		b, err := proto.Marshal(mesg)
198		if err != nil {
199			t.Errorf("failed to marshal message %d: %v", k, err)
200		}
201		data = append(data, b)
202	}
203	results, err = ms.AppendRows(ctx, data, NoStreamOffset)
204	if err != nil {
205		t.Errorf("grouped-row append failed: %v", err)
206	}
207	// wait for the result to indicate ready, then validate again.  Our total rows have increased, but
208	// cardinality should not.
209	results[0].Ready()
210	validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
211		withExactRowCount(int64(2*len(testSimpleData))),
212		withDistinctValues("name", int64(len(testSimpleData))),
213		withDistinctValues("value", int64(3)),
214	)
215}
216
217func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
218	testTable := dataset.Table(tableIDs.New())
219	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
220		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
221	}
222
223	md, descriptorProto := setupDynamicDescriptors(t, testdata.SimpleMessageSchema)
224
225	ms, err := mwClient.NewManagedStream(ctx,
226		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
227		WithType(DefaultStream),
228		WithSchemaDescriptor(descriptorProto),
229	)
230	if err != nil {
231		t.Fatalf("NewManagedStream: %v", err)
232	}
233	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
234		withExactRowCount(0))
235
236	sampleJSONData := [][]byte{
237		[]byte(`{"name": "one", "value": 1}`),
238		[]byte(`{"name": "two", "value": 2}`),
239		[]byte(`{"name": "three", "value": 3}`),
240		[]byte(`{"name": "four", "value": 4}`),
241		[]byte(`{"name": "five", "value": 5}`),
242	}
243
244	var results []*AppendResult
245	for k, v := range sampleJSONData {
246		message := dynamicpb.NewMessage(md)
247
248		// First, json->proto message
249		err = protojson.Unmarshal(v, message)
250		if err != nil {
251			t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
252		}
253		// Then, proto message -> bytes.
254		b, err := proto.Marshal(message)
255		if err != nil {
256			t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
257		}
258		results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
259		if err != nil {
260			t.Errorf("single-row append %d failed: %v", k, err)
261		}
262	}
263
264	// wait for the result to indicate ready, then validate.
265	results[0].Ready()
266	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
267		withExactRowCount(int64(len(sampleJSONData))),
268		withDistinctValues("name", int64(len(sampleJSONData))),
269		withDistinctValues("value", int64(len(sampleJSONData))))
270}
271
272func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
273	testTable := dataset.Table(tableIDs.New())
274	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
275		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
276	}
277
278	m := &testdata.SimpleMessageProto2{}
279	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
280
281	ms, err := mwClient.NewManagedStream(ctx,
282		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
283		WithType(BufferedStream),
284		WithSchemaDescriptor(descriptorProto),
285	)
286	if err != nil {
287		t.Fatalf("NewManagedStream: %v", err)
288	}
289
290	info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID)
291	if err != nil {
292		t.Errorf("couldn't get stream info: %v", err)
293	}
294	if info.GetType().String() != string(ms.StreamType()) {
295		t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
296	}
297	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
298		withExactRowCount(0))
299
300	var expectedRows int64
301	for k, mesg := range testSimpleData {
302		b, err := proto.Marshal(mesg)
303		if err != nil {
304			t.Errorf("failed to marshal message %d: %v", k, err)
305		}
306		data := [][]byte{b}
307		results, err := ms.AppendRows(ctx, data, NoStreamOffset)
308		if err != nil {
309			t.Errorf("single-row append %d failed: %v", k, err)
310		}
311		// wait for ack
312		offset, err := results[0].GetResult(ctx)
313		if err != nil {
314			t.Errorf("got error from pending result %d: %v", k, err)
315		}
316		validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k),
317			withExactRowCount(expectedRows),
318			withDistinctValues("name", expectedRows))
319
320		// move offset and re-validate.
321		flushOffset, err := ms.FlushRows(ctx, offset)
322		if err != nil {
323			t.Errorf("failed to flush offset to %d: %v", offset, err)
324		}
325		expectedRows = flushOffset + 1
326		validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k),
327			withExactRowCount(expectedRows),
328			withDistinctValues("name", expectedRows))
329	}
330}
331
332func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
333	testTable := dataset.Table(tableIDs.New())
334	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
335		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
336	}
337
338	m := &testdata.SimpleMessageProto2{}
339	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
340
341	// setup a new stream.
342	ms, err := mwClient.NewManagedStream(ctx,
343		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
344		WithType(CommittedStream),
345		WithSchemaDescriptor(descriptorProto),
346	)
347	if err != nil {
348		t.Fatalf("NewManagedStream: %v", err)
349	}
350	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
351		withExactRowCount(0))
352
353	var results []*AppendResult
354	for k, mesg := range testSimpleData {
355		b, err := proto.Marshal(mesg)
356		if err != nil {
357			t.Errorf("failed to marshal message %d: %v", k, err)
358		}
359		data := [][]byte{b}
360		results, err = ms.AppendRows(ctx, data, NoStreamOffset)
361		if err != nil {
362			t.Errorf("single-row append %d failed: %v", k, err)
363		}
364	}
365	// wait for the result to indicate ready, then validate.
366	results[0].Ready()
367	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
368		withExactRowCount(int64(len(testSimpleData))))
369}
370
371func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
372	testTable := dataset.Table(tableIDs.New())
373	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
374		t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
375	}
376
377	m := &testdata.SimpleMessageProto2{}
378	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
379
380	ms, err := mwClient.NewManagedStream(ctx,
381		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
382		WithType(PendingStream),
383		WithSchemaDescriptor(descriptorProto),
384	)
385	if err != nil {
386		t.Fatalf("NewManagedStream: %v", err)
387	}
388	validateTableConstraints(ctx, t, bqClient, testTable, "before send",
389		withExactRowCount(0))
390
391	// Send data.
392	var results []*AppendResult
393	for k, mesg := range testSimpleData {
394		b, err := proto.Marshal(mesg)
395		if err != nil {
396			t.Errorf("failed to marshal message %d: %v", k, err)
397		}
398		data := [][]byte{b}
399		results, err = ms.AppendRows(ctx, data, NoStreamOffset)
400		if err != nil {
401			t.Errorf("single-row append %d failed: %v", k, err)
402		}
403	}
404	results[0].Ready()
405	wantRows := int64(len(testSimpleData))
406
407	// Mark stream complete.
408	trackedOffset, err := ms.Finalize(ctx)
409	if err != nil {
410		t.Errorf("Finalize: %v", err)
411	}
412
413	if trackedOffset != wantRows {
414		t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
415	}
416
417	// Commit stream and validate.
418	resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
419	if err != nil {
420		t.Errorf("client.BatchCommit: %v", err)
421	}
422	if len(resp.StreamErrors) > 0 {
423		t.Errorf("stream errors present: %v", resp.StreamErrors)
424	}
425	validateTableConstraints(ctx, t, bqClient, testTable, "after send",
426		withExactRowCount(int64(len(testSimpleData))))
427}
428
429func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
430	testedViews := []*view.View{
431		AppendRequestsView,
432		AppendResponsesView,
433		AppendClientOpenView,
434	}
435
436	if err := view.Register(testedViews...); err != nil {
437		t.Fatalf("couldn't register views: %v", err)
438	}
439
440	testTable := dataset.Table(tableIDs.New())
441	if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
442		t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
443	}
444
445	m := &testdata.SimpleMessageProto2{}
446	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
447
448	// setup a new stream.
449	ms, err := mwClient.NewManagedStream(ctx,
450		WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
451		WithType(DefaultStream),
452		WithSchemaDescriptor(descriptorProto),
453	)
454	if err != nil {
455		t.Fatalf("NewManagedStream: %v", err)
456	}
457
458	var results []*AppendResult
459	for k, mesg := range testSimpleData {
460		b, err := proto.Marshal(mesg)
461		if err != nil {
462			t.Errorf("failed to marshal message %d: %v", k, err)
463		}
464		data := [][]byte{b}
465		results, err = ms.AppendRows(ctx, data, NoStreamOffset)
466		if err != nil {
467			t.Errorf("single-row append %d failed: %v", k, err)
468		}
469	}
470	// wait for the result to indicate ready.
471	results[0].Ready()
472	// Ick.  Stats reporting can't force flushing, and there's a race here.  Sleep to give the recv goroutine a chance
473	// to report.
474	time.Sleep(time.Second)
475
476	for _, tv := range testedViews {
477		metricData, err := view.RetrieveData(tv.Name)
478		if err != nil {
479			t.Errorf("view %q RetrieveData: %v", tv.Name, err)
480		}
481		if len(metricData) > 1 {
482			t.Errorf("%q: only expected 1 row, got %d", tv.Name, len(metricData))
483		}
484		if len(metricData[0].Tags) != 1 {
485			t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags))
486		}
487		entry := metricData[0].Data
488		sum, ok := entry.(*view.SumData)
489		if !ok {
490			t.Errorf("unexpected metric type: %T", entry)
491		}
492		got := sum.Value
493		var want int64
494		switch tv {
495		case AppendRequestsView:
496			want = int64(len(testSimpleData))
497		case AppendResponsesView:
498			want = int64(len(testSimpleData))
499		case AppendClientOpenView:
500			want = 1
501		}
502
503		// float comparison; diff more than error bound is error
504		if math.Abs(got-float64(want)) > 0.1 {
505			t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want)
506		}
507	}
508}
509
510func TestIntegration_DetectProjectID(t *testing.T) {
511	ctx := context.Background()
512	testCreds := testutil.Credentials(ctx)
513	if testCreds == nil {
514		t.Skip("test credentials not present, skipping")
515	}
516
517	if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
518		t.Errorf("test NewClient: %v", err)
519	}
520
521	badTS := testutil.ErroringTokenSource{}
522
523	if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
524		t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
525	}
526}
527