1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "reflect" 22 23 "cloud.google.com/go/internal/trace" 24 bq "google.golang.org/api/bigquery/v2" 25) 26 27// An Inserter does streaming inserts into a BigQuery table. 28// It is safe for concurrent use. 29type Inserter struct { 30 t *Table 31 32 // SkipInvalidRows causes rows containing invalid data to be silently 33 // ignored. The default value is false, which causes the entire request to 34 // fail if there is an attempt to insert an invalid row. 35 SkipInvalidRows bool 36 37 // IgnoreUnknownValues causes values not matching the schema to be ignored. 38 // The default value is false, which causes records containing such values 39 // to be treated as invalid records. 40 IgnoreUnknownValues bool 41 42 // A TableTemplateSuffix allows Inserters to create tables automatically. 43 // 44 // Experimental: this option is experimental and may be modified or removed in future versions, 45 // regardless of any other documented package stability guarantees. 46 // 47 // When you specify a suffix, the table you upload data to 48 // will be used as a template for creating a new table, with the same schema, 49 // called <table> + <suffix>. 50 // 51 // More information is available at 52 // https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables 53 TableTemplateSuffix string 54} 55 56// Inserter returns an Inserter that can be used to append rows to t. 57// The returned Inserter may optionally be further configured before its Put method is called. 58// 59// To stream rows into a date-partitioned table at a particular date, add the 60// $yyyymmdd suffix to the table name when constructing the Table. 61func (t *Table) Inserter() *Inserter { 62 return &Inserter{t: t} 63} 64 65// Uploader calls Inserter. 66// Deprecated: use Table.Inserter instead. 67func (t *Table) Uploader() *Inserter { return t.Inserter() } 68 69// Put uploads one or more rows to the BigQuery service. 70// 71// If src is ValueSaver, then its Save method is called to produce a row for uploading. 72// 73// If src is a struct or pointer to a struct, then a schema is inferred from it 74// and used to create a StructSaver. The InsertID of the StructSaver will be 75// empty. 76// 77// If src is a slice of ValueSavers, structs, or struct pointers, then each 78// element of the slice is treated as above, and multiple rows are uploaded. 79// 80// Put returns a PutMultiError if one or more rows failed to be uploaded. 81// The PutMultiError contains a RowInsertionError for each failed row. 82// 83// Put will retry on temporary errors (see 84// https://cloud.google.com/bigquery/troubleshooting-errors). This can result 85// in duplicate rows if you do not use insert IDs. Also, if the error persists, 86// the call will run indefinitely. Pass a context with a timeout to prevent 87// hanging calls. 88func (u *Inserter) Put(ctx context.Context, src interface{}) (err error) { 89 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Inserter.Put") 90 defer func() { trace.EndSpan(ctx, err) }() 91 92 savers, err := valueSavers(src) 93 if err != nil { 94 return err 95 } 96 return u.putMulti(ctx, savers) 97} 98 99func valueSavers(src interface{}) ([]ValueSaver, error) { 100 saver, ok, err := toValueSaver(src) 101 if err != nil { 102 return nil, err 103 } 104 if ok { 105 return []ValueSaver{saver}, nil 106 } 107 srcVal := reflect.ValueOf(src) 108 if srcVal.Kind() != reflect.Slice { 109 return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src) 110 111 } 112 var savers []ValueSaver 113 for i := 0; i < srcVal.Len(); i++ { 114 s := srcVal.Index(i).Interface() 115 saver, ok, err := toValueSaver(s) 116 if err != nil { 117 return nil, err 118 } 119 if !ok { 120 return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s) 121 } 122 savers = append(savers, saver) 123 } 124 return savers, nil 125} 126 127// Make a ValueSaver from x, which must implement ValueSaver already 128// or be a struct or pointer to struct. 129func toValueSaver(x interface{}) (ValueSaver, bool, error) { 130 if _, ok := x.(StructSaver); ok { 131 return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver") 132 } 133 var insertID string 134 // Handle StructSavers specially so we can infer the schema if necessary. 135 if ss, ok := x.(*StructSaver); ok && ss.Schema == nil { 136 x = ss.Struct 137 insertID = ss.InsertID 138 // Fall through so we can infer the schema. 139 } 140 if saver, ok := x.(ValueSaver); ok { 141 return saver, ok, nil 142 } 143 v := reflect.ValueOf(x) 144 // Support Put with []interface{} 145 if v.Kind() == reflect.Interface { 146 v = v.Elem() 147 } 148 if v.Kind() == reflect.Ptr { 149 v = v.Elem() 150 } 151 if v.Kind() != reflect.Struct { 152 return nil, false, nil 153 } 154 schema, err := inferSchemaReflectCached(v.Type()) 155 if err != nil { 156 return nil, false, err 157 } 158 return &StructSaver{ 159 Struct: x, 160 InsertID: insertID, 161 Schema: schema, 162 }, true, nil 163} 164 165func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error { 166 req, err := u.newInsertRequest(src) 167 if err != nil { 168 return err 169 } 170 if req == nil { 171 return nil 172 } 173 call := u.t.c.bqs.Tabledata.InsertAll(u.t.ProjectID, u.t.DatasetID, u.t.TableID, req) 174 call = call.Context(ctx) 175 setClientHeader(call.Header()) 176 var res *bq.TableDataInsertAllResponse 177 err = runWithRetry(ctx, func() (err error) { 178 res, err = call.Do() 179 return err 180 }) 181 if err != nil { 182 return err 183 } 184 return handleInsertErrors(res.InsertErrors, req.Rows) 185} 186 187func (u *Inserter) newInsertRequest(savers []ValueSaver) (*bq.TableDataInsertAllRequest, error) { 188 if savers == nil { // If there are no rows, do nothing. 189 return nil, nil 190 } 191 req := &bq.TableDataInsertAllRequest{ 192 TemplateSuffix: u.TableTemplateSuffix, 193 IgnoreUnknownValues: u.IgnoreUnknownValues, 194 SkipInvalidRows: u.SkipInvalidRows, 195 } 196 for _, saver := range savers { 197 row, insertID, err := saver.Save() 198 if err != nil { 199 return nil, err 200 } 201 if insertID == "" { 202 insertID = randomIDFn() 203 } 204 m := make(map[string]bq.JsonValue) 205 for k, v := range row { 206 m[k] = bq.JsonValue(v) 207 } 208 req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{ 209 InsertId: insertID, 210 Json: m, 211 }) 212 } 213 return req, nil 214} 215 216func handleInsertErrors(ierrs []*bq.TableDataInsertAllResponseInsertErrors, rows []*bq.TableDataInsertAllRequestRows) error { 217 if len(ierrs) == 0 { 218 return nil 219 } 220 var errs PutMultiError 221 for _, e := range ierrs { 222 if int(e.Index) > len(rows) { 223 return fmt.Errorf("internal error: unexpected row index: %v", e.Index) 224 } 225 rie := RowInsertionError{ 226 InsertID: rows[e.Index].InsertId, 227 RowIndex: int(e.Index), 228 } 229 for _, errp := range e.Errors { 230 rie.Errors = append(rie.Errors, bqToError(errp)) 231 } 232 errs = append(errs, rie) 233 } 234 return errs 235} 236 237// Uploader is an obsolete name for Inserter. 238type Uploader = Inserter 239