1// Copyright 2019 The Go Cloud Development Kit Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package mongodocstore provides a docstore implementation for MongoDB 16// and MongoDB-compatible services hosted on-premise or by cloud providers, 17// including Amazon DocumentDB and Azure Cosmos DB. 18// 19// 20// URLs 21// 22// For docstore.OpenCollection, mongodocstore registers for the scheme "mongo". 23// The default URL opener will dial a Mongo server using the environment 24// variable "MONGO_SERVER_URL". 25// To customize the URL opener, or for more details on the URL format, 26// see URLOpener. 27// See https://gocloud.dev/concepts/urls/ for background information. 28// 29// 30// Action Lists 31// 32// mongodocstore uses the unordered BulkWrite call of the underlying driver for writes, and uses Find with a list of document IDs for Get. 33// (These implementation choices are subject to change.) 34// It calls the BeforeDo function once before each call to the underlying driver. The as function passed 35// to the BeforeDo function exposes the following types: 36// - Gets: *options.FindOptions 37// - writes: []mongo.WriteModel and *options.BulkWriteOptions 38// 39// As 40// 41// mongodocstore exposes the following types for As: 42// - Collection: *mongo.Collection 43// - Query.BeforeQuery: *options.FindOptions or bson.D (the filter for Delete and Update queries) 44// - DocumentIterator: *mongo.Cursor 45// - Error: mongo.CommandError, mongo.BulkWriteError, mongo.BulkWriteException 46// 47// 48// Special Considerations 49// 50// MongoDB represents times to millisecond precision, while Go's time.Time type has 51// nanosecond precision. To save time.Times to MongoDB without loss of precision, 52// save the result of calling UnixNano on the time. 53// 54// The official Go driver for MongoDB, go.mongodb.org/mongo-driver/mongo, lowercases 55// struct field names; other docstore drivers do not. This means that you have to choose 56// between interoperating with the MongoDB driver and interoperating with other docstore drivers. 57// See Options.LowercaseFields for more information. 58package mongodocstore // import "gocloud.dev/docstore/mongodocstore" 59 60// MongoDB reference manual: https://docs.mongodb.com/manual 61// Client documentation: https://godoc.org/go.mongodb.org/mongo-driver/mongo 62// 63// The client methods accept a document of type interface{}, 64// which is marshaled by the go.mongodb.org/mongo-driver/bson package. 65 66import ( 67 "context" 68 "reflect" 69 "strings" 70 71 "github.com/google/wire" 72 "go.mongodb.org/mongo-driver/bson" 73 "go.mongodb.org/mongo-driver/bson/primitive" 74 "go.mongodb.org/mongo-driver/mongo" 75 "go.mongodb.org/mongo-driver/mongo/options" 76 "gocloud.dev/docstore" 77 "gocloud.dev/docstore/driver" 78 "gocloud.dev/gcerrors" 79 "gocloud.dev/internal/gcerr" 80) 81 82// Dial returns a new mongoDB client that is connected to the server URI. 83func Dial(ctx context.Context, uri string) (*mongo.Client, error) { 84 opts := options.Client().ApplyURI(uri) 85 if err := opts.Validate(); err != nil { 86 return nil, err 87 } 88 client, err := mongo.NewClient(opts) 89 if err != nil { 90 return nil, err 91 } 92 if err := client.Connect(ctx); err != nil { 93 return nil, err 94 } 95 return client, nil 96} 97 98// Set holds Wire providers for this package. 99var Set = wire.NewSet( 100 Dial, 101 wire.Struct(new(URLOpener), "Client"), 102) 103 104type collection struct { 105 coll *mongo.Collection 106 idField string 107 idFunc func(docstore.Document) interface{} 108 revisionField string 109 opts *Options 110} 111 112type Options struct { 113 // Lowercase all field names for document encoding, field selection, update 114 // modifications and queries. 115 // 116 // If false (the default), then struct fields and MongoDB document fields will 117 // have the same names. For example, a struct field F will correspond to a 118 // MongoDB document field "F". This setting matches the behavior of other 119 // docstore drivers, making code portable across services. 120 // 121 // If true, all fields correspond to lower-cased MongoDB document fields. The 122 // field name F will correspond to the MongoDB document field "f", for 123 // instance. Use this to make code that uses this package interoperate with 124 // code that uses the official Go client for MongoDB, 125 // go.mongodb.org/mongo-driver/mongo, which lowercases field names. 126 LowercaseFields bool 127 // The name of the field holding the document revision. 128 // Defaults to docstore.DefaultRevisionField. 129 RevisionField string 130 // Whether Query.Update writes a new revision into the updated documents. 131 // The default is false, meaning that a revision will be written to all 132 // documents that satisfy the query's conditions. Set to true if and only if 133 // the collection's documents do not have revision fields. 134 NoWriteQueryUpdateRevisions bool 135} 136 137// OpenCollection opens a MongoDB collection for use with Docstore. 138// The idField argument is the name of the document field to use for the document ID 139// (MongoDB's _id field). If it is empty, the field "_id" will be used. 140func OpenCollection(mcoll *mongo.Collection, idField string, opts *Options) (*docstore.Collection, error) { 141 dc, err := newCollection(mcoll, idField, nil, opts) 142 if err != nil { 143 return nil, err 144 } 145 return docstore.NewCollection(dc), nil 146} 147 148// OpenCollectionWithIDFunc opens a MongoDB collection for use with Docstore. 149// The idFunc argument is function that accepts a document and returns the value to 150// be used for the document ID (MongoDB's _id field). IDFunc should return nil if the 151// document is missing the information to construct an ID. This will cause all 152// actions, even Create, to fail. 153func OpenCollectionWithIDFunc(mcoll *mongo.Collection, idFunc func(docstore.Document) interface{}, opts *Options) (*docstore.Collection, error) { 154 dc, err := newCollection(mcoll, "", idFunc, opts) 155 if err != nil { 156 return nil, err 157 } 158 return docstore.NewCollection(dc), nil 159} 160 161func newCollection(mcoll *mongo.Collection, idField string, idFunc func(docstore.Document) interface{}, opts *Options) (*collection, error) { 162 if opts == nil { 163 opts = &Options{} 164 } 165 if opts.RevisionField == "" { 166 opts.RevisionField = docstore.DefaultRevisionField 167 } 168 c := &collection{ 169 coll: mcoll, 170 idField: idField, 171 idFunc: idFunc, 172 revisionField: opts.RevisionField, 173 opts: opts, 174 } 175 if c.idField == "" && c.idFunc == nil { 176 c.idField = mongoIDField 177 } 178 179 if opts.LowercaseFields { 180 c.idField = strings.ToLower(c.idField) 181 c.revisionField = strings.ToLower(c.revisionField) 182 } 183 return c, nil 184} 185 186func (c *collection) Key(doc driver.Document) (interface{}, error) { 187 if c.idField != "" { 188 id, _ := doc.GetField(c.idField) 189 return id, nil // missing field is not an error 190 } 191 id := c.idFunc(doc.Origin) 192 if id == nil || driver.IsEmptyValue(reflect.ValueOf(id)) { 193 return nil, gcerr.Newf(gcerr.InvalidArgument, nil, "missing document key") 194 } 195 return id, nil 196} 197 198func (c *collection) RevisionField() string { 199 return c.opts.RevisionField 200} 201 202// From https://docs.mongodb.com/manual/core/document: "The field name _id is 203// reserved for use as a primary key; its value must be unique in the collection, is 204// immutable, and may be of any type other than an array." 205const mongoIDField = "_id" 206 207func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError { 208 errs := make([]error, len(actions)) 209 beforeGets, gets, writes, afterGets := driver.GroupActions(actions) 210 c.runGets(ctx, beforeGets, errs, opts) 211 ch := make(chan []error) 212 go func() { ch <- c.bulkWrite(ctx, writes, errs, opts) }() 213 c.runGets(ctx, gets, errs, opts) 214 writeErrs := <-ch 215 c.runGets(ctx, afterGets, errs, opts) 216 alerr := driver.NewActionListError(errs) 217 for _, werr := range writeErrs { 218 alerr = append(alerr, indexedError{-1, werr}) 219 } 220 return alerr 221} 222 223type indexedError = struct { 224 Index int 225 Err error 226} 227 228func (c *collection) runGets(ctx context.Context, gets []*driver.Action, errs []error, opts *driver.RunActionsOptions) { 229 // TODO(shantuo): figure out a reasonable batch size, there is no hard limit on 230 // the item number or filter string length. The limit for bulk write batch size 231 // is 100,000. 232 for _, group := range driver.GroupByFieldPath(gets) { 233 c.bulkFind(ctx, group, errs, opts) 234 } 235} 236 237func (c *collection) bulkFind(ctx context.Context, gets []*driver.Action, errs []error, dopts *driver.RunActionsOptions) { 238 // errors need to be mapped to the actions' indices. 239 setErr := func(err error) { 240 for _, get := range gets { 241 if errs[get.Index] == nil { 242 errs[get.Index] = err 243 } 244 } 245 } 246 247 opts := options.Find() 248 if len(gets[0].FieldPaths) > 0 { 249 opts.Projection = c.projectionDoc(gets[0].FieldPaths) 250 } 251 ids := bson.A{} 252 idToAction := map[interface{}]*driver.Action{} 253 for _, a := range gets { 254 id, err := encodeValue(a.Key) 255 if err != nil { 256 errs[a.Index] = err 257 } else { 258 ids = append(ids, id) 259 idToAction[id] = a 260 } 261 } 262 if dopts.BeforeDo != nil { 263 if err := dopts.BeforeDo(driver.AsFunc(opts)); err != nil { 264 setErr(err) 265 return 266 } 267 } 268 cursor, err := c.coll.Find(ctx, bson.D{bson.E{Key: mongoIDField, Value: bson.D{{Key: "$in", Value: ids}}}}, opts) 269 if err != nil { 270 setErr(err) 271 return 272 } 273 defer cursor.Close(ctx) 274 275 found := make(map[*driver.Action]bool) 276 for cursor.Next(ctx) { 277 var m map[string]interface{} 278 if err := cursor.Decode(&m); err != nil { 279 continue 280 } 281 a := idToAction[m[mongoIDField]] 282 errs[a.Index] = decodeDoc(m, a.Doc, c.idField, c.opts.LowercaseFields) 283 found[a] = true 284 } 285 for _, a := range gets { 286 if !found[a] { 287 errs[a.Index] = gcerr.Newf(gcerr.NotFound, nil, "item with key %v not found", a.Key) 288 } 289 } 290} 291 292// Construct a mongo "projection document" from field paths. 293// Always include the revision field. 294func (c *collection) projectionDoc(fps [][]string) bson.D { 295 proj := bson.D{{Key: c.revisionField, Value: 1}} 296 for _, fp := range fps { 297 path := c.toMongoFieldPath(fp) 298 if path != c.revisionField { 299 proj = append(proj, bson.E{Key: path, Value: 1}) 300 } 301 } 302 return proj 303} 304 305func (c *collection) toMongoFieldPath(fp []string) string { 306 if c.opts.LowercaseFields { 307 sliceToLower(fp) 308 } 309 return strings.Join(fp, ".") 310} 311 312func sliceToLower(s []string) { 313 for i, e := range s { 314 s[i] = strings.ToLower(e) 315 } 316} 317 318func (c *collection) prepareCreate(a *driver.Action) (mdoc, createdID interface{}, rev string, err error) { 319 id := a.Key 320 if id == nil { 321 // Create a unique ID here. (The MongoDB Go client does this for us when calling InsertOne, 322 // but not for BulkWrite.) 323 id = primitive.NewObjectID() 324 createdID = id 325 } else { 326 id, err = encodeValue(id) 327 if err != nil { 328 return nil, nil, "", err 329 } 330 } 331 mdoc, rev, err = c.encodeDoc(a.Doc, id) 332 if err != nil { 333 return nil, nil, "", err 334 } 335 return mdoc, createdID, rev, nil 336} 337 338func (c *collection) prepareReplace(a *driver.Action) (filter bson.D, mdoc map[string]interface{}, rev string, err error) { 339 id, err := encodeValue(a.Key) 340 if err != nil { 341 return nil, nil, "", err 342 } 343 filter, _, err = c.makeFilter(id, a.Doc) 344 if err != nil { 345 return nil, nil, "", err 346 } 347 mdoc, rev, err = c.encodeDoc(a.Doc, id) 348 if err != nil { 349 return nil, nil, "", err 350 } 351 return filter, mdoc, rev, nil 352} 353 354// encodeDoc encodes doc and sets its ID to the encoded value id. It also creates a new revision and sets it. 355// It returns the encoded document and the new revision. 356func (c *collection) encodeDoc(doc driver.Document, id interface{}) (map[string]interface{}, string, error) { 357 mdoc, err := encodeDoc(doc, c.opts.LowercaseFields) 358 if err != nil { 359 return nil, "", err 360 } 361 if id != nil { 362 if c.idField != "" { 363 delete(mdoc, c.idField) 364 } 365 mdoc[mongoIDField] = id 366 } 367 var rev string 368 if c.hasField(doc, c.revisionField) { 369 rev = driver.UniqueString() 370 mdoc[c.revisionField] = rev 371 } 372 return mdoc, rev, nil 373} 374 375func (c *collection) prepareUpdate(a *driver.Action) (filter bson.D, updateDoc map[string]bson.D, rev string, err error) { 376 id, err := encodeValue(a.Key) 377 if err != nil { 378 return nil, nil, "", err 379 } 380 filter, _, err = c.makeFilter(id, a.Doc) 381 if err != nil { 382 return nil, nil, "", err 383 } 384 updateDoc, rev, err = c.newUpdateDoc(a.Mods, c.hasField(a.Doc, c.revisionField)) 385 if err != nil { 386 return nil, nil, "", err 387 } 388 return filter, updateDoc, rev, nil 389} 390 391func (c *collection) newUpdateDoc(mods []driver.Mod, writeRevision bool) (map[string]bson.D, string, error) { 392 var ( 393 sets bson.D 394 unsets bson.D 395 incs bson.D 396 ) 397 for _, m := range mods { 398 key := c.toMongoFieldPath(m.FieldPath) 399 if m.Value == nil { 400 unsets = append(unsets, bson.E{Key: key, Value: ""}) 401 } else if inc, ok := m.Value.(driver.IncOp); ok { 402 val, err := encodeValue(inc.Amount) 403 if err != nil { 404 return nil, "", err 405 } 406 incs = append(incs, bson.E{Key: key, Value: val}) 407 } else { 408 val, err := encodeValue(m.Value) 409 if err != nil { 410 return nil, "", err 411 } 412 sets = append(sets, bson.E{Key: key, Value: val}) 413 } 414 } 415 updateDoc := map[string]bson.D{} 416 var rev string 417 if writeRevision { 418 rev = driver.UniqueString() 419 sets = append(sets, bson.E{Key: c.revisionField, Value: rev}) 420 } 421 updateDoc["$set"] = sets 422 if len(unsets) > 0 { 423 updateDoc["$unset"] = unsets 424 } 425 if len(incs) > 0 { 426 updateDoc["$inc"] = incs 427 } 428 return updateDoc, rev, nil 429} 430 431// makeFilter constructs a filter using the given encoded id and the document's revision field, if any. 432func (c *collection) makeFilter(id interface{}, doc driver.Document) (filter bson.D, rev interface{}, err error) { 433 rev, err = doc.GetField(c.revisionField) 434 if err != nil && gcerrors.Code(err) != gcerrors.NotFound { 435 return nil, nil, err 436 } 437 // Only select the document with the given ID. 438 filter = bson.D{bson.E{Key: "_id", Value: id}} 439 // If the given document has a revision, it must match the stored document. 440 if rev != nil { 441 filter = append(filter, bson.E{Key: c.revisionField, Value: rev}) 442 } 443 return filter, rev, nil 444} 445 446// bulkWrite calls the Mongo driver's BulkWrite RPC in unordered mode with the 447// actions, which must be writes. 448// errs is the slice of errors indexed by the position of the action in the original 449// action list. bulkWrite populates this slice. In addition, bulkWrite returns a list 450// of errors that cannot be attributed to any single action. 451func (c *collection) bulkWrite(ctx context.Context, actions []*driver.Action, errs []error, dopts *driver.RunActionsOptions) []error { 452 var ( 453 models []mongo.WriteModel 454 modelActions []*driver.Action // corresponding action for each model 455 newIDs []interface{} // new IDs for Create actions, corresponding to models slice 456 revs []string // new revisions, corresponding to models slice 457 nDeletes int64 458 nNonCreateWrite int64 // total operations expected from Put, Replace and Update 459 ) 460 for _, a := range actions { 461 var m mongo.WriteModel 462 var err error 463 var newID interface{} 464 var rev string 465 switch a.Kind { 466 case driver.Create: 467 m, newID, rev, err = c.newCreateModel(a) 468 case driver.Delete: 469 m, err = c.newDeleteModel(a) 470 if err == nil { 471 nDeletes++ 472 } 473 case driver.Replace, driver.Put: 474 m, rev, err = c.newReplaceModel(a, a.Kind == driver.Put) 475 if err == nil { 476 nNonCreateWrite++ 477 } 478 case driver.Update: 479 m, rev, err = c.newUpdateModel(a) 480 if err == nil && m != nil { 481 nNonCreateWrite++ 482 } 483 default: 484 err = gcerr.Newf(gcerr.Internal, nil, "bad action %+v", a) 485 } 486 if err != nil { 487 errs[a.Index] = err 488 } else if m != nil { // m can be nil for a no-op update 489 models = append(models, m) 490 modelActions = append(modelActions, a) 491 newIDs = append(newIDs, newID) 492 revs = append(revs, rev) 493 } 494 } 495 if len(models) == 0 { 496 return nil 497 } 498 499 bopts := options.BulkWrite().SetOrdered(false) 500 if dopts.BeforeDo != nil { 501 asFunc := func(target interface{}) bool { 502 switch t := target.(type) { 503 case *[]mongo.WriteModel: 504 *t = models 505 case **options.BulkWriteOptions: 506 *t = bopts 507 default: 508 return false 509 } 510 return true 511 } 512 if err := dopts.BeforeDo(asFunc); err != nil { 513 return []error{err} 514 } 515 } 516 517 // TODO(jba): improve independent execution. I think that even if BulkWrite returns an error, 518 // some of the actions may have succeeded. 519 var reterrs []error 520 res, err := c.coll.BulkWrite(ctx, models, bopts) 521 if err != nil { 522 bwe, ok := err.(mongo.BulkWriteException) 523 if !ok { // assume everything failed with this error 524 return []error{err} 525 } 526 // The returned indexes of the WriteErrors are wrong. See https://jira.mongodb.org/browse/GODRIVER-1028. 527 // Until it's fixed, use negative values for the indexes in the errors we return. 528 for _, w := range bwe.WriteErrors { 529 reterrs = append(reterrs, gcerr.Newf(translateMongoCode(w.Code), w, "%s", w.Message)) 530 } 531 return reterrs 532 } 533 for i, newID := range newIDs { 534 if newID == nil { 535 continue 536 } 537 a := modelActions[i] 538 if err := a.Doc.SetField(c.idField, newID); err != nil { 539 errs[a.Index] = err 540 } 541 } 542 for i, rev := range revs { 543 a := modelActions[i] 544 if rev != "" && c.hasField(a.Doc, c.revisionField) { 545 if err := a.Doc.SetField(c.revisionField, rev); err != nil && errs[a.Index] == nil { 546 errs[a.Index] = err 547 } 548 } 549 } 550 if res.DeletedCount != nDeletes { 551 // Some Delete actions failed. It's not an error if a Delete failed because 552 // the document didn't exist, but it is an error if it failed because of a 553 // precondition mismatch. Find all the documents with revisions we tried to delete; if 554 // any are still present, that's an error. 555 c.determineDeleteErrors(ctx, models, modelActions, errs) 556 } 557 if res.MatchedCount+res.UpsertedCount != nNonCreateWrite { 558 reterrs = append(reterrs, gcerr.Newf(gcerr.NotFound, nil, "some writes failed (replaced %d, upserted %d, out of total %d)", res.MatchedCount, res.UpsertedCount, nNonCreateWrite)) 559 } 560 return reterrs 561} 562 563func (c *collection) determineDeleteErrors(ctx context.Context, models []mongo.WriteModel, actions []*driver.Action, errs []error) { 564 // TODO(jba): do this concurrently. 565 for i, m := range models { 566 if dm, ok := m.(*mongo.DeleteOneModel); ok { 567 filter := dm.Filter.(bson.D) 568 if len(filter) > 1 { 569 // Delete with both ID and revision. See if the document is still there. 570 idOnlyFilter := filter[:1] 571 // TODO(shantuo): use Find instead of FindOne. 572 res := c.coll.FindOne(ctx, idOnlyFilter) 573 574 // Assume an error means the document wasn't found. 575 // That means either that it was deleted successfully, or that it never 576 // existed. Either way, it's not an error. 577 // TODO(jba): distinguish between not found and other errors. 578 if res.Err() == nil { 579 // The document exists, but we didn't delete it: assume we had the wrong 580 // revision. 581 errs[actions[i].Index] = gcerr.Newf(gcerr.FailedPrecondition, nil, 582 "wrong revision for document with ID %v", actions[i].Key) 583 } 584 } 585 } 586 } 587} 588 589func (c *collection) newCreateModel(a *driver.Action) (*mongo.InsertOneModel, interface{}, string, error) { 590 mdoc, createdID, rev, err := c.prepareCreate(a) 591 if err != nil { 592 return nil, nil, "", err 593 } 594 return &mongo.InsertOneModel{Document: mdoc}, createdID, rev, nil 595} 596 597func (c *collection) newDeleteModel(a *driver.Action) (*mongo.DeleteOneModel, error) { 598 id, err := encodeValue(a.Key) 599 if err != nil { 600 return nil, err 601 } 602 filter, _, err := c.makeFilter(id, a.Doc) 603 if err != nil { 604 return nil, err 605 } 606 return &mongo.DeleteOneModel{Filter: filter}, nil 607} 608 609func (c *collection) newReplaceModel(a *driver.Action, upsert bool) (*mongo.ReplaceOneModel, string, error) { 610 filter, mdoc, rev, err := c.prepareReplace(a) 611 if err != nil { 612 return nil, "", err 613 } 614 return &mongo.ReplaceOneModel{ 615 Filter: filter, 616 Replacement: mdoc, 617 Upsert: &upsert, 618 }, rev, nil 619} 620 621func (c *collection) newUpdateModel(a *driver.Action) (*mongo.UpdateOneModel, string, error) { 622 filter, updateDoc, rev, err := c.prepareUpdate(a) 623 if err != nil { 624 return nil, "", err 625 } 626 if filter == nil { // no-op 627 return nil, "", nil 628 } 629 return &mongo.UpdateOneModel{Filter: filter, Update: updateDoc}, rev, nil 630} 631 632// RevisionToBytes implements driver.RevisionToBytes. 633func (c *collection) RevisionToBytes(rev interface{}) ([]byte, error) { 634 s, ok := rev.(string) 635 if !ok { 636 return nil, gcerr.Newf(gcerr.InvalidArgument, nil, "revision %v of type %[1]T is not a string", rev) 637 } 638 return []byte(s), nil 639} 640 641func (c *collection) hasField(doc driver.Document, field string) bool { 642 if c.opts.LowercaseFields { 643 return doc.HasFieldFold(field) 644 } 645 return doc.HasField(field) 646} 647 648// BytesToRevision implements driver.BytesToRevision. 649func (c *collection) BytesToRevision(b []byte) (interface{}, error) { 650 return string(b), nil 651} 652 653// As implements driver.As. 654func (c *collection) As(i interface{}) bool { 655 p, ok := i.(**mongo.Collection) 656 if !ok { 657 return false 658 } 659 *p = c.coll 660 return true 661} 662 663// ErrorAs implements driver.Collection.ErrorAs 664func (c *collection) ErrorAs(err error, i interface{}) bool { 665 switch e := err.(type) { 666 case mongo.CommandError: 667 if p, ok := i.(*mongo.CommandError); ok { 668 *p = e 669 return true 670 } 671 case mongo.BulkWriteError: 672 if p, ok := i.(*mongo.BulkWriteError); ok { 673 *p = e 674 return true 675 } 676 case mongo.BulkWriteException: 677 if p, ok := i.(*mongo.BulkWriteException); ok { 678 *p = e 679 return true 680 } 681 } 682 return false 683} 684 685// ErrorCode implements driver.Collection.ErrorCode. 686func (c *collection) ErrorCode(err error) gcerrors.ErrorCode { 687 if g, ok := err.(*gcerr.Error); ok { 688 return g.Code 689 } 690 if err == mongo.ErrNoDocuments { 691 return gcerrors.NotFound 692 } 693 if wexc, ok := err.(mongo.WriteException); ok && len(wexc.WriteErrors) > 0 { 694 return translateMongoCode(wexc.WriteErrors[0].Code) 695 } 696 return gcerrors.Unknown 697} 698 699// Close implements driver.Collection.Close. 700func (c *collection) Close() error { return nil } 701 702// Error code for a write error when no documents match a filter. 703// (The Go mongo driver doesn't define an exported constant for this.) 704const mongoDupKeyCode = 11000 705 706func translateMongoCode(code int) gcerrors.ErrorCode { 707 switch code { 708 case mongoDupKeyCode: 709 return gcerrors.AlreadyExists 710 default: 711 return gcerrors.Unknown 712 } 713} 714