1// Copyright 2015 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bigquery 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "reflect" 22 23 "cloud.google.com/go/internal/trace" 24 bq "google.golang.org/api/bigquery/v2" 25) 26 27// NoDedupeID indicates a streaming insert row wants to opt out of best-effort 28// deduplication. 29// It is EXPERIMENTAL and subject to change or removal without notice. 30const NoDedupeID = "NoDedupeID" 31 32// An Inserter does streaming inserts into a BigQuery table. 33// It is safe for concurrent use. 34type Inserter struct { 35 t *Table 36 37 // SkipInvalidRows causes rows containing invalid data to be silently 38 // ignored. The default value is false, which causes the entire request to 39 // fail if there is an attempt to insert an invalid row. 40 SkipInvalidRows bool 41 42 // IgnoreUnknownValues causes values not matching the schema to be ignored. 43 // The default value is false, which causes records containing such values 44 // to be treated as invalid records. 45 IgnoreUnknownValues bool 46 47 // A TableTemplateSuffix allows Inserters to create tables automatically. 48 // 49 // Experimental: this option is experimental and may be modified or removed in future versions, 50 // regardless of any other documented package stability guarantees. In general, 51 // the BigQuery team recommends the use of partitioned tables over sharding 52 // tables by suffix. 53 // 54 // When you specify a suffix, the table you upload data to 55 // will be used as a template for creating a new table, with the same schema, 56 // called <table> + <suffix>. 57 // 58 // More information is available at 59 // https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables 60 TableTemplateSuffix string 61} 62 63// Inserter returns an Inserter that can be used to append rows to t. 64// The returned Inserter may optionally be further configured before its Put method is called. 65// 66// To stream rows into a date-partitioned table at a particular date, add the 67// $yyyymmdd suffix to the table name when constructing the Table. 68func (t *Table) Inserter() *Inserter { 69 return &Inserter{t: t} 70} 71 72// Uploader calls Inserter. 73// Deprecated: use Table.Inserter instead. 74func (t *Table) Uploader() *Inserter { return t.Inserter() } 75 76// Put uploads one or more rows to the BigQuery service. 77// 78// If src is ValueSaver, then its Save method is called to produce a row for uploading. 79// 80// If src is a struct or pointer to a struct, then a schema is inferred from it 81// and used to create a StructSaver. The InsertID of the StructSaver will be 82// empty. 83// 84// If src is a slice of ValueSavers, structs, or struct pointers, then each 85// element of the slice is treated as above, and multiple rows are uploaded. 86// 87// Put returns a PutMultiError if one or more rows failed to be uploaded. 88// The PutMultiError contains a RowInsertionError for each failed row. 89// 90// Put will retry on temporary errors (see 91// https://cloud.google.com/bigquery/troubleshooting-errors). This can result 92// in duplicate rows if you do not use insert IDs. Also, if the error persists, 93// the call will run indefinitely. Pass a context with a timeout to prevent 94// hanging calls. 95func (u *Inserter) Put(ctx context.Context, src interface{}) (err error) { 96 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Inserter.Put") 97 defer func() { trace.EndSpan(ctx, err) }() 98 99 savers, err := valueSavers(src) 100 if err != nil { 101 return err 102 } 103 return u.putMulti(ctx, savers) 104} 105 106func valueSavers(src interface{}) ([]ValueSaver, error) { 107 saver, ok, err := toValueSaver(src) 108 if err != nil { 109 return nil, err 110 } 111 if ok { 112 return []ValueSaver{saver}, nil 113 } 114 srcVal := reflect.ValueOf(src) 115 if srcVal.Kind() != reflect.Slice { 116 return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src) 117 118 } 119 var savers []ValueSaver 120 for i := 0; i < srcVal.Len(); i++ { 121 s := srcVal.Index(i).Interface() 122 saver, ok, err := toValueSaver(s) 123 if err != nil { 124 return nil, err 125 } 126 if !ok { 127 return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s) 128 } 129 savers = append(savers, saver) 130 } 131 return savers, nil 132} 133 134// Make a ValueSaver from x, which must implement ValueSaver already 135// or be a struct or pointer to struct. 136func toValueSaver(x interface{}) (ValueSaver, bool, error) { 137 if _, ok := x.(StructSaver); ok { 138 return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver") 139 } 140 var insertID string 141 // Handle StructSavers specially so we can infer the schema if necessary. 142 if ss, ok := x.(*StructSaver); ok && ss.Schema == nil { 143 x = ss.Struct 144 insertID = ss.InsertID 145 // Fall through so we can infer the schema. 146 } 147 if saver, ok := x.(ValueSaver); ok { 148 return saver, ok, nil 149 } 150 v := reflect.ValueOf(x) 151 // Support Put with []interface{} 152 if v.Kind() == reflect.Interface { 153 v = v.Elem() 154 } 155 if v.Kind() == reflect.Ptr { 156 v = v.Elem() 157 } 158 if v.Kind() != reflect.Struct { 159 return nil, false, nil 160 } 161 schema, err := inferSchemaReflectCached(v.Type()) 162 if err != nil { 163 return nil, false, err 164 } 165 return &StructSaver{ 166 Struct: x, 167 InsertID: insertID, 168 Schema: schema, 169 }, true, nil 170} 171 172func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error { 173 req, err := u.newInsertRequest(src) 174 if err != nil { 175 return err 176 } 177 if req == nil { 178 return nil 179 } 180 call := u.t.c.bqs.Tabledata.InsertAll(u.t.ProjectID, u.t.DatasetID, u.t.TableID, req) 181 call = call.Context(ctx) 182 setClientHeader(call.Header()) 183 var res *bq.TableDataInsertAllResponse 184 err = runWithRetry(ctx, func() (err error) { 185 res, err = call.Do() 186 return err 187 }) 188 if err != nil { 189 return err 190 } 191 return handleInsertErrors(res.InsertErrors, req.Rows) 192} 193 194func (u *Inserter) newInsertRequest(savers []ValueSaver) (*bq.TableDataInsertAllRequest, error) { 195 if savers == nil { // If there are no rows, do nothing. 196 return nil, nil 197 } 198 req := &bq.TableDataInsertAllRequest{ 199 TemplateSuffix: u.TableTemplateSuffix, 200 IgnoreUnknownValues: u.IgnoreUnknownValues, 201 SkipInvalidRows: u.SkipInvalidRows, 202 } 203 for _, saver := range savers { 204 row, insertID, err := saver.Save() 205 if err != nil { 206 return nil, err 207 } 208 if insertID == NoDedupeID { 209 // User wants to opt-out of sending deduplication ID. 210 insertID = "" 211 } else if insertID == "" { 212 insertID = randomIDFn() 213 } 214 m := make(map[string]bq.JsonValue) 215 for k, v := range row { 216 m[k] = bq.JsonValue(v) 217 } 218 req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{ 219 InsertId: insertID, 220 Json: m, 221 }) 222 } 223 return req, nil 224} 225 226func handleInsertErrors(ierrs []*bq.TableDataInsertAllResponseInsertErrors, rows []*bq.TableDataInsertAllRequestRows) error { 227 if len(ierrs) == 0 { 228 return nil 229 } 230 var errs PutMultiError 231 for _, e := range ierrs { 232 if int(e.Index) >= len(rows) { 233 return fmt.Errorf("internal error: unexpected row index: %v", e.Index) 234 } 235 rie := RowInsertionError{ 236 InsertID: rows[e.Index].InsertId, 237 RowIndex: int(e.Index), 238 } 239 for _, errp := range e.Errors { 240 rie.Errors = append(rie.Errors, bqToError(errp)) 241 } 242 errs = append(errs, rie) 243 } 244 return errs 245} 246 247// Uploader is an obsolete name for Inserter. 248type Uploader = Inserter 249