1// Copyright 2015 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bigquery
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"reflect"
22
23	"cloud.google.com/go/internal/trace"
24	bq "google.golang.org/api/bigquery/v2"
25)
26
27// An Inserter does streaming inserts into a BigQuery table.
28// It is safe for concurrent use.
29type Inserter struct {
30	t *Table
31
32	// SkipInvalidRows causes rows containing invalid data to be silently
33	// ignored. The default value is false, which causes the entire request to
34	// fail if there is an attempt to insert an invalid row.
35	SkipInvalidRows bool
36
37	// IgnoreUnknownValues causes values not matching the schema to be ignored.
38	// The default value is false, which causes records containing such values
39	// to be treated as invalid records.
40	IgnoreUnknownValues bool
41
42	// A TableTemplateSuffix allows Inserters to create tables automatically.
43	//
44	// Experimental: this option is experimental and may be modified or removed in future versions,
45	// regardless of any other documented package stability guarantees.
46	//
47	// When you specify a suffix, the table you upload data to
48	// will be used as a template for creating a new table, with the same schema,
49	// called <table> + <suffix>.
50	//
51	// More information is available at
52	// https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
53	TableTemplateSuffix string
54}
55
56// Inserter returns an Inserter that can be used to append rows to t.
57// The returned Inserter may optionally be further configured before its Put method is called.
58//
59// To stream rows into a date-partitioned table at a particular date, add the
60// $yyyymmdd suffix to the table name when constructing the Table.
61func (t *Table) Inserter() *Inserter {
62	return &Inserter{t: t}
63}
64
65// Uploader calls Inserter.
66// Deprecated: use Table.Inserter instead.
67func (t *Table) Uploader() *Inserter { return t.Inserter() }
68
69// Put uploads one or more rows to the BigQuery service.
70//
71// If src is ValueSaver, then its Save method is called to produce a row for uploading.
72//
73// If src is a struct or pointer to a struct, then a schema is inferred from it
74// and used to create a StructSaver. The InsertID of the StructSaver will be
75// empty.
76//
77// If src is a slice of ValueSavers, structs, or struct pointers, then each
78// element of the slice is treated as above, and multiple rows are uploaded.
79//
80// Put returns a PutMultiError if one or more rows failed to be uploaded.
81// The PutMultiError contains a RowInsertionError for each failed row.
82//
83// Put will retry on temporary errors (see
84// https://cloud.google.com/bigquery/troubleshooting-errors). This can result
85// in duplicate rows if you do not use insert IDs. Also, if the error persists,
86// the call will run indefinitely. Pass a context with a timeout to prevent
87// hanging calls.
88func (u *Inserter) Put(ctx context.Context, src interface{}) (err error) {
89	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Inserter.Put")
90	defer func() { trace.EndSpan(ctx, err) }()
91
92	savers, err := valueSavers(src)
93	if err != nil {
94		return err
95	}
96	return u.putMulti(ctx, savers)
97}
98
99func valueSavers(src interface{}) ([]ValueSaver, error) {
100	saver, ok, err := toValueSaver(src)
101	if err != nil {
102		return nil, err
103	}
104	if ok {
105		return []ValueSaver{saver}, nil
106	}
107	srcVal := reflect.ValueOf(src)
108	if srcVal.Kind() != reflect.Slice {
109		return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src)
110
111	}
112	var savers []ValueSaver
113	for i := 0; i < srcVal.Len(); i++ {
114		s := srcVal.Index(i).Interface()
115		saver, ok, err := toValueSaver(s)
116		if err != nil {
117			return nil, err
118		}
119		if !ok {
120			return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s)
121		}
122		savers = append(savers, saver)
123	}
124	return savers, nil
125}
126
127// Make a ValueSaver from x, which must implement ValueSaver already
128// or be a struct or pointer to struct.
129func toValueSaver(x interface{}) (ValueSaver, bool, error) {
130	if _, ok := x.(StructSaver); ok {
131		return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver")
132	}
133	var insertID string
134	// Handle StructSavers specially so we can infer the schema if necessary.
135	if ss, ok := x.(*StructSaver); ok && ss.Schema == nil {
136		x = ss.Struct
137		insertID = ss.InsertID
138		// Fall through so we can infer the schema.
139	}
140	if saver, ok := x.(ValueSaver); ok {
141		return saver, ok, nil
142	}
143	v := reflect.ValueOf(x)
144	// Support Put with []interface{}
145	if v.Kind() == reflect.Interface {
146		v = v.Elem()
147	}
148	if v.Kind() == reflect.Ptr {
149		v = v.Elem()
150	}
151	if v.Kind() != reflect.Struct {
152		return nil, false, nil
153	}
154	schema, err := inferSchemaReflectCached(v.Type())
155	if err != nil {
156		return nil, false, err
157	}
158	return &StructSaver{
159		Struct:   x,
160		InsertID: insertID,
161		Schema:   schema,
162	}, true, nil
163}
164
165func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
166	req, err := u.newInsertRequest(src)
167	if err != nil {
168		return err
169	}
170	if req == nil {
171		return nil
172	}
173	call := u.t.c.bqs.Tabledata.InsertAll(u.t.ProjectID, u.t.DatasetID, u.t.TableID, req)
174	call = call.Context(ctx)
175	setClientHeader(call.Header())
176	var res *bq.TableDataInsertAllResponse
177	err = runWithRetry(ctx, func() (err error) {
178		res, err = call.Do()
179		return err
180	})
181	if err != nil {
182		return err
183	}
184	return handleInsertErrors(res.InsertErrors, req.Rows)
185}
186
187func (u *Inserter) newInsertRequest(savers []ValueSaver) (*bq.TableDataInsertAllRequest, error) {
188	if savers == nil { // If there are no rows, do nothing.
189		return nil, nil
190	}
191	req := &bq.TableDataInsertAllRequest{
192		TemplateSuffix:      u.TableTemplateSuffix,
193		IgnoreUnknownValues: u.IgnoreUnknownValues,
194		SkipInvalidRows:     u.SkipInvalidRows,
195	}
196	for _, saver := range savers {
197		row, insertID, err := saver.Save()
198		if err != nil {
199			return nil, err
200		}
201		if insertID == "" {
202			insertID = randomIDFn()
203		}
204		m := make(map[string]bq.JsonValue)
205		for k, v := range row {
206			m[k] = bq.JsonValue(v)
207		}
208		req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{
209			InsertId: insertID,
210			Json:     m,
211		})
212	}
213	return req, nil
214}
215
216func handleInsertErrors(ierrs []*bq.TableDataInsertAllResponseInsertErrors, rows []*bq.TableDataInsertAllRequestRows) error {
217	if len(ierrs) == 0 {
218		return nil
219	}
220	var errs PutMultiError
221	for _, e := range ierrs {
222		if int(e.Index) > len(rows) {
223			return fmt.Errorf("internal error: unexpected row index: %v", e.Index)
224		}
225		rie := RowInsertionError{
226			InsertID: rows[e.Index].InsertId,
227			RowIndex: int(e.Index),
228		}
229		for _, errp := range e.Errors {
230			rie.Errors = append(rie.Errors, bqToError(errp))
231		}
232		errs = append(errs, rie)
233	}
234	return errs
235}
236
237// Uploader is an obsolete name for Inserter.
238type Uploader = Inserter
239