1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"reflect"
22
23	"cloud.google.com/go/internal/trace"
24	bq "google.golang.org/api/bigquery/v2"
25)
26
27// NoDedupeID indicates a streaming insert row wants to opt out of best-effort
28// deduplication.
29// It is EXPERIMENTAL and subject to change or removal without notice.
30const NoDedupeID = "NoDedupeID"
31
32// An Inserter does streaming inserts into a BigQuery table.
33// It is safe for concurrent use.
34type Inserter struct {
35	t *Table
36
37	// SkipInvalidRows causes rows containing invalid data to be silently
38	// ignored. The default value is false, which causes the entire request to
39	// fail if there is an attempt to insert an invalid row.
40	SkipInvalidRows bool
41
42	// IgnoreUnknownValues causes values not matching the schema to be ignored.
43	// The default value is false, which causes records containing such values
44	// to be treated as invalid records.
45	IgnoreUnknownValues bool
46
47	// A TableTemplateSuffix allows Inserters to create tables automatically.
48	//
49	// Experimental: this option is experimental and may be modified or removed in future versions,
50	// regardless of any other documented package stability guarantees. In general,
51	// the BigQuery team recommends the use of partitioned tables over sharding
52	// tables by suffix.
53	//
54	// When you specify a suffix, the table you upload data to
55	// will be used as a template for creating a new table, with the same schema,
56	// called <table> + <suffix>.
57	//
58	// More information is available at
59	// https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
60	TableTemplateSuffix string
61}
62
63// Inserter returns an Inserter that can be used to append rows to t.
64// The returned Inserter may optionally be further configured before its Put method is called.
65//
66// To stream rows into a date-partitioned table at a particular date, add the
67// $yyyymmdd suffix to the table name when constructing the Table.
68func (t *Table) Inserter() *Inserter {
69	return &Inserter{t: t}
70}
71
72// Uploader calls Inserter.
73// Deprecated: use Table.Inserter instead.
74func (t *Table) Uploader() *Inserter { return t.Inserter() }
75
76// Put uploads one or more rows to the BigQuery service.
77//
78// If src is ValueSaver, then its Save method is called to produce a row for uploading.
79//
80// If src is a struct or pointer to a struct, then a schema is inferred from it
81// and used to create a StructSaver. The InsertID of the StructSaver will be
82// empty.
83//
84// If src is a slice of ValueSavers, structs, or struct pointers, then each
85// element of the slice is treated as above, and multiple rows are uploaded.
86//
87// Put returns a PutMultiError if one or more rows failed to be uploaded.
88// The PutMultiError contains a RowInsertionError for each failed row.
89//
90// Put will retry on temporary errors (see
91// https://cloud.google.com/bigquery/troubleshooting-errors). This can result
92// in duplicate rows if you do not use insert IDs. Also, if the error persists,
93// the call will run indefinitely. Pass a context with a timeout to prevent
94// hanging calls.
95func (u *Inserter) Put(ctx context.Context, src interface{}) (err error) {
96	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Inserter.Put")
97	defer func() { trace.EndSpan(ctx, err) }()
98
99	savers, err := valueSavers(src)
100	if err != nil {
101		return err
102	}
103	return u.putMulti(ctx, savers)
104}
105
106func valueSavers(src interface{}) ([]ValueSaver, error) {
107	saver, ok, err := toValueSaver(src)
108	if err != nil {
109		return nil, err
110	}
111	if ok {
112		return []ValueSaver{saver}, nil
113	}
114	srcVal := reflect.ValueOf(src)
115	if srcVal.Kind() != reflect.Slice {
116		return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src)
117
118	}
119	var savers []ValueSaver
120	for i := 0; i < srcVal.Len(); i++ {
121		s := srcVal.Index(i).Interface()
122		saver, ok, err := toValueSaver(s)
123		if err != nil {
124			return nil, err
125		}
126		if !ok {
127			return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s)
128		}
129		savers = append(savers, saver)
130	}
131	return savers, nil
132}
133
134// Make a ValueSaver from x, which must implement ValueSaver already
135// or be a struct or pointer to struct.
136func toValueSaver(x interface{}) (ValueSaver, bool, error) {
137	if _, ok := x.(StructSaver); ok {
138		return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver")
139	}
140	var insertID string
141	// Handle StructSavers specially so we can infer the schema if necessary.
142	if ss, ok := x.(*StructSaver); ok && ss.Schema == nil {
143		x = ss.Struct
144		insertID = ss.InsertID
145		// Fall through so we can infer the schema.
146	}
147	if saver, ok := x.(ValueSaver); ok {
148		return saver, ok, nil
149	}
150	v := reflect.ValueOf(x)
151	// Support Put with []interface{}
152	if v.Kind() == reflect.Interface {
153		v = v.Elem()
154	}
155	if v.Kind() == reflect.Ptr {
156		v = v.Elem()
157	}
158	if v.Kind() != reflect.Struct {
159		return nil, false, nil
160	}
161	schema, err := inferSchemaReflectCached(v.Type())
162	if err != nil {
163		return nil, false, err
164	}
165	return &StructSaver{
166		Struct:   x,
167		InsertID: insertID,
168		Schema:   schema,
169	}, true, nil
170}
171
172func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
173	req, err := u.newInsertRequest(src)
174	if err != nil {
175		return err
176	}
177	if req == nil {
178		return nil
179	}
180	call := u.t.c.bqs.Tabledata.InsertAll(u.t.ProjectID, u.t.DatasetID, u.t.TableID, req)
181	call = call.Context(ctx)
182	setClientHeader(call.Header())
183	var res *bq.TableDataInsertAllResponse
184	err = runWithRetry(ctx, func() (err error) {
185		res, err = call.Do()
186		return err
187	})
188	if err != nil {
189		return err
190	}
191	return handleInsertErrors(res.InsertErrors, req.Rows)
192}
193
194func (u *Inserter) newInsertRequest(savers []ValueSaver) (*bq.TableDataInsertAllRequest, error) {
195	if savers == nil { // If there are no rows, do nothing.
196		return nil, nil
197	}
198	req := &bq.TableDataInsertAllRequest{
199		TemplateSuffix:      u.TableTemplateSuffix,
200		IgnoreUnknownValues: u.IgnoreUnknownValues,
201		SkipInvalidRows:     u.SkipInvalidRows,
202	}
203	for _, saver := range savers {
204		row, insertID, err := saver.Save()
205		if err != nil {
206			return nil, err
207		}
208		if insertID == NoDedupeID {
209			// User wants to opt-out of sending deduplication ID.
210			insertID = ""
211		} else if insertID == "" {
212			insertID = randomIDFn()
213		}
214		m := make(map[string]bq.JsonValue)
215		for k, v := range row {
216			m[k] = bq.JsonValue(v)
217		}
218		req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{
219			InsertId: insertID,
220			Json:     m,
221		})
222	}
223	return req, nil
224}
225
226func handleInsertErrors(ierrs []*bq.TableDataInsertAllResponseInsertErrors, rows []*bq.TableDataInsertAllRequestRows) error {
227	if len(ierrs) == 0 {
228		return nil
229	}
230	var errs PutMultiError
231	for _, e := range ierrs {
232		if int(e.Index) >= len(rows) {
233			return fmt.Errorf("internal error: unexpected row index: %v", e.Index)
234		}
235		rie := RowInsertionError{
236			InsertID: rows[e.Index].InsertId,
237			RowIndex: int(e.Index),
238		}
239		for _, errp := range e.Errors {
240			rie.Errors = append(rie.Errors, bqToError(errp))
241		}
242		errs = append(errs, rie)
243	}
244	return errs
245}
246
247// Uploader is an obsolete name for Inserter.
248type Uploader = Inserter
249