1// Copyright 2019 The Go Cloud Development Kit Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package mongodocstore provides a docstore implementation for MongoDB
16// and MongoDB-compatible services hosted on-premise or by cloud providers,
17// including Amazon DocumentDB and Azure Cosmos DB.
18//
19//
20// URLs
21//
22// For docstore.OpenCollection, mongodocstore registers for the scheme "mongo".
23// The default URL opener will dial a Mongo server using the environment
24// variable "MONGO_SERVER_URL".
25// To customize the URL opener, or for more details on the URL format,
26// see URLOpener.
27// See https://gocloud.dev/concepts/urls/ for background information.
28//
29//
30// Action Lists
31//
32// mongodocstore uses the unordered BulkWrite call of the underlying driver for writes, and uses Find with a list of document IDs for Get.
33// (These implementation choices are subject to change.)
34// It calls the BeforeDo function once before each call to the underlying driver. The as function passed
35// to the BeforeDo function exposes the following types:
36//  - Gets: *options.FindOptions
37//  - writes: []mongo.WriteModel and *options.BulkWriteOptions
38//
39// As
40//
41// mongodocstore exposes the following types for As:
42//  - Collection: *mongo.Collection
43//  - Query.BeforeQuery: *options.FindOptions or bson.D (the filter for Delete and Update queries)
44//  - DocumentIterator: *mongo.Cursor
45//  - Error: mongo.CommandError, mongo.BulkWriteError, mongo.BulkWriteException
46//
47//
48// Special Considerations
49//
50// MongoDB represents times to millisecond precision, while Go's time.Time type has
51// nanosecond precision. To save time.Times to MongoDB without loss of precision,
52// save the result of calling UnixNano on the time.
53//
54// The official Go driver for MongoDB, go.mongodb.org/mongo-driver/mongo, lowercases
55// struct field names; other docstore drivers do not. This means that you have to choose
56// between interoperating with the MongoDB driver and interoperating with other docstore drivers.
57// See Options.LowercaseFields for more information.
58package mongodocstore // import "gocloud.dev/docstore/mongodocstore"
59
60// MongoDB reference manual: https://docs.mongodb.com/manual
61// Client documentation: https://godoc.org/go.mongodb.org/mongo-driver/mongo
62//
63// The client methods accept a document of type interface{},
64// which is marshaled by the go.mongodb.org/mongo-driver/bson package.
65
66import (
67	"context"
68	"reflect"
69	"strings"
70
71	"github.com/google/wire"
72	"go.mongodb.org/mongo-driver/bson"
73	"go.mongodb.org/mongo-driver/bson/primitive"
74	"go.mongodb.org/mongo-driver/mongo"
75	"go.mongodb.org/mongo-driver/mongo/options"
76	"gocloud.dev/docstore"
77	"gocloud.dev/docstore/driver"
78	"gocloud.dev/gcerrors"
79	"gocloud.dev/internal/gcerr"
80)
81
82// Dial returns a new mongoDB client that is connected to the server URI.
83func Dial(ctx context.Context, uri string) (*mongo.Client, error) {
84	opts := options.Client().ApplyURI(uri)
85	if err := opts.Validate(); err != nil {
86		return nil, err
87	}
88	client, err := mongo.NewClient(opts)
89	if err != nil {
90		return nil, err
91	}
92	if err := client.Connect(ctx); err != nil {
93		return nil, err
94	}
95	return client, nil
96}
97
98// Set holds Wire providers for this package.
99var Set = wire.NewSet(
100	Dial,
101	wire.Struct(new(URLOpener), "Client"),
102)
103
104type collection struct {
105	coll          *mongo.Collection
106	idField       string
107	idFunc        func(docstore.Document) interface{}
108	revisionField string
109	opts          *Options
110}
111
112type Options struct {
113	// Lowercase all field names for document encoding, field selection, update
114	// modifications and queries.
115	//
116	// If false (the default), then struct fields and MongoDB document fields will
117	// have the same names. For example, a struct field F will correspond to a
118	// MongoDB document field "F". This setting matches the behavior of other
119	// docstore drivers, making code portable across services.
120	//
121	// If true, all fields correspond to lower-cased MongoDB document fields. The
122	// field name F will correspond to the MongoDB document field "f", for
123	// instance. Use this to make code that uses this package interoperate with
124	// code that uses the official Go client for MongoDB,
125	// go.mongodb.org/mongo-driver/mongo, which lowercases field names.
126	LowercaseFields bool
127	// The name of the field holding the document revision.
128	// Defaults to docstore.DefaultRevisionField.
129	RevisionField string
130	// Whether Query.Update writes a new revision into the updated documents.
131	// The default is false, meaning that a revision will be written to all
132	// documents that satisfy the query's conditions. Set to true if and only if
133	// the collection's documents do not have revision fields.
134	NoWriteQueryUpdateRevisions bool
135}
136
137// OpenCollection opens a MongoDB collection for use with Docstore.
138// The idField argument is the name of the document field to use for the document ID
139// (MongoDB's _id field). If it is empty, the field "_id" will be used.
140func OpenCollection(mcoll *mongo.Collection, idField string, opts *Options) (*docstore.Collection, error) {
141	dc, err := newCollection(mcoll, idField, nil, opts)
142	if err != nil {
143		return nil, err
144	}
145	return docstore.NewCollection(dc), nil
146}
147
148// OpenCollectionWithIDFunc opens a MongoDB collection for use with Docstore.
149// The idFunc argument is function that accepts a document and returns the value to
150// be used for the document ID (MongoDB's _id field). IDFunc should return nil if the
151// document is missing the information to construct an ID. This will cause all
152// actions, even Create, to fail.
153func OpenCollectionWithIDFunc(mcoll *mongo.Collection, idFunc func(docstore.Document) interface{}, opts *Options) (*docstore.Collection, error) {
154	dc, err := newCollection(mcoll, "", idFunc, opts)
155	if err != nil {
156		return nil, err
157	}
158	return docstore.NewCollection(dc), nil
159}
160
161func newCollection(mcoll *mongo.Collection, idField string, idFunc func(docstore.Document) interface{}, opts *Options) (*collection, error) {
162	if opts == nil {
163		opts = &Options{}
164	}
165	if opts.RevisionField == "" {
166		opts.RevisionField = docstore.DefaultRevisionField
167	}
168	c := &collection{
169		coll:          mcoll,
170		idField:       idField,
171		idFunc:        idFunc,
172		revisionField: opts.RevisionField,
173		opts:          opts,
174	}
175	if c.idField == "" && c.idFunc == nil {
176		c.idField = mongoIDField
177	}
178
179	if opts.LowercaseFields {
180		c.idField = strings.ToLower(c.idField)
181		c.revisionField = strings.ToLower(c.revisionField)
182	}
183	return c, nil
184}
185
186func (c *collection) Key(doc driver.Document) (interface{}, error) {
187	if c.idField != "" {
188		id, _ := doc.GetField(c.idField)
189		return id, nil // missing field is not an error
190	}
191	id := c.idFunc(doc.Origin)
192	if id == nil || driver.IsEmptyValue(reflect.ValueOf(id)) {
193		return nil, gcerr.Newf(gcerr.InvalidArgument, nil, "missing document key")
194	}
195	return id, nil
196}
197
198func (c *collection) RevisionField() string {
199	return c.opts.RevisionField
200}
201
202// From https://docs.mongodb.com/manual/core/document: "The field name _id is
203// reserved for use as a primary key; its value must be unique in the collection, is
204// immutable, and may be of any type other than an array."
205const mongoIDField = "_id"
206
207func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
208	errs := make([]error, len(actions))
209	beforeGets, gets, writes, afterGets := driver.GroupActions(actions)
210	c.runGets(ctx, beforeGets, errs, opts)
211	ch := make(chan []error)
212	go func() { ch <- c.bulkWrite(ctx, writes, errs, opts) }()
213	c.runGets(ctx, gets, errs, opts)
214	writeErrs := <-ch
215	c.runGets(ctx, afterGets, errs, opts)
216	alerr := driver.NewActionListError(errs)
217	for _, werr := range writeErrs {
218		alerr = append(alerr, indexedError{-1, werr})
219	}
220	return alerr
221}
222
223type indexedError = struct {
224	Index int
225	Err   error
226}
227
228func (c *collection) runGets(ctx context.Context, gets []*driver.Action, errs []error, opts *driver.RunActionsOptions) {
229	// TODO(shantuo): figure out a reasonable batch size, there is no hard limit on
230	// the item number or filter string length. The limit for bulk write batch size
231	// is 100,000.
232	for _, group := range driver.GroupByFieldPath(gets) {
233		c.bulkFind(ctx, group, errs, opts)
234	}
235}
236
237func (c *collection) bulkFind(ctx context.Context, gets []*driver.Action, errs []error, dopts *driver.RunActionsOptions) {
238	// errors need to be mapped to the actions' indices.
239	setErr := func(err error) {
240		for _, get := range gets {
241			if errs[get.Index] == nil {
242				errs[get.Index] = err
243			}
244		}
245	}
246
247	opts := options.Find()
248	if len(gets[0].FieldPaths) > 0 {
249		opts.Projection = c.projectionDoc(gets[0].FieldPaths)
250	}
251	ids := bson.A{}
252	idToAction := map[interface{}]*driver.Action{}
253	for _, a := range gets {
254		id, err := encodeValue(a.Key)
255		if err != nil {
256			errs[a.Index] = err
257		} else {
258			ids = append(ids, id)
259			idToAction[id] = a
260		}
261	}
262	if dopts.BeforeDo != nil {
263		if err := dopts.BeforeDo(driver.AsFunc(opts)); err != nil {
264			setErr(err)
265			return
266		}
267	}
268	cursor, err := c.coll.Find(ctx, bson.D{bson.E{Key: mongoIDField, Value: bson.D{{Key: "$in", Value: ids}}}}, opts)
269	if err != nil {
270		setErr(err)
271		return
272	}
273	defer cursor.Close(ctx)
274
275	found := make(map[*driver.Action]bool)
276	for cursor.Next(ctx) {
277		var m map[string]interface{}
278		if err := cursor.Decode(&m); err != nil {
279			continue
280		}
281		a := idToAction[m[mongoIDField]]
282		errs[a.Index] = decodeDoc(m, a.Doc, c.idField, c.opts.LowercaseFields)
283		found[a] = true
284	}
285	for _, a := range gets {
286		if !found[a] {
287			errs[a.Index] = gcerr.Newf(gcerr.NotFound, nil, "item with key %v not found", a.Key)
288		}
289	}
290}
291
292// Construct a mongo "projection document" from field paths.
293// Always include the revision field.
294func (c *collection) projectionDoc(fps [][]string) bson.D {
295	proj := bson.D{{Key: c.revisionField, Value: 1}}
296	for _, fp := range fps {
297		path := c.toMongoFieldPath(fp)
298		if path != c.revisionField {
299			proj = append(proj, bson.E{Key: path, Value: 1})
300		}
301	}
302	return proj
303}
304
305func (c *collection) toMongoFieldPath(fp []string) string {
306	if c.opts.LowercaseFields {
307		sliceToLower(fp)
308	}
309	return strings.Join(fp, ".")
310}
311
312func sliceToLower(s []string) {
313	for i, e := range s {
314		s[i] = strings.ToLower(e)
315	}
316}
317
318func (c *collection) prepareCreate(a *driver.Action) (mdoc, createdID interface{}, rev string, err error) {
319	id := a.Key
320	if id == nil {
321		// Create a unique ID here. (The MongoDB Go client does this for us when calling InsertOne,
322		// but not for BulkWrite.)
323		id = primitive.NewObjectID()
324		createdID = id
325	} else {
326		id, err = encodeValue(id)
327		if err != nil {
328			return nil, nil, "", err
329		}
330	}
331	mdoc, rev, err = c.encodeDoc(a.Doc, id)
332	if err != nil {
333		return nil, nil, "", err
334	}
335	return mdoc, createdID, rev, nil
336}
337
338func (c *collection) prepareReplace(a *driver.Action) (filter bson.D, mdoc map[string]interface{}, rev string, err error) {
339	id, err := encodeValue(a.Key)
340	if err != nil {
341		return nil, nil, "", err
342	}
343	filter, _, err = c.makeFilter(id, a.Doc)
344	if err != nil {
345		return nil, nil, "", err
346	}
347	mdoc, rev, err = c.encodeDoc(a.Doc, id)
348	if err != nil {
349		return nil, nil, "", err
350	}
351	return filter, mdoc, rev, nil
352}
353
354// encodeDoc encodes doc and sets its ID to the encoded value id. It also creates a new revision and sets it.
355// It returns the encoded document and the new revision.
356func (c *collection) encodeDoc(doc driver.Document, id interface{}) (map[string]interface{}, string, error) {
357	mdoc, err := encodeDoc(doc, c.opts.LowercaseFields)
358	if err != nil {
359		return nil, "", err
360	}
361	if id != nil {
362		if c.idField != "" {
363			delete(mdoc, c.idField)
364		}
365		mdoc[mongoIDField] = id
366	}
367	var rev string
368	if c.hasField(doc, c.revisionField) {
369		rev = driver.UniqueString()
370		mdoc[c.revisionField] = rev
371	}
372	return mdoc, rev, nil
373}
374
375func (c *collection) prepareUpdate(a *driver.Action) (filter bson.D, updateDoc map[string]bson.D, rev string, err error) {
376	id, err := encodeValue(a.Key)
377	if err != nil {
378		return nil, nil, "", err
379	}
380	filter, _, err = c.makeFilter(id, a.Doc)
381	if err != nil {
382		return nil, nil, "", err
383	}
384	updateDoc, rev, err = c.newUpdateDoc(a.Mods, c.hasField(a.Doc, c.revisionField))
385	if err != nil {
386		return nil, nil, "", err
387	}
388	return filter, updateDoc, rev, nil
389}
390
391func (c *collection) newUpdateDoc(mods []driver.Mod, writeRevision bool) (map[string]bson.D, string, error) {
392	var (
393		sets   bson.D
394		unsets bson.D
395		incs   bson.D
396	)
397	for _, m := range mods {
398		key := c.toMongoFieldPath(m.FieldPath)
399		if m.Value == nil {
400			unsets = append(unsets, bson.E{Key: key, Value: ""})
401		} else if inc, ok := m.Value.(driver.IncOp); ok {
402			val, err := encodeValue(inc.Amount)
403			if err != nil {
404				return nil, "", err
405			}
406			incs = append(incs, bson.E{Key: key, Value: val})
407		} else {
408			val, err := encodeValue(m.Value)
409			if err != nil {
410				return nil, "", err
411			}
412			sets = append(sets, bson.E{Key: key, Value: val})
413		}
414	}
415	updateDoc := map[string]bson.D{}
416	var rev string
417	if writeRevision {
418		rev = driver.UniqueString()
419		sets = append(sets, bson.E{Key: c.revisionField, Value: rev})
420	}
421	updateDoc["$set"] = sets
422	if len(unsets) > 0 {
423		updateDoc["$unset"] = unsets
424	}
425	if len(incs) > 0 {
426		updateDoc["$inc"] = incs
427	}
428	return updateDoc, rev, nil
429}
430
431// makeFilter constructs a filter using the given encoded id and the document's revision field, if any.
432func (c *collection) makeFilter(id interface{}, doc driver.Document) (filter bson.D, rev interface{}, err error) {
433	rev, err = doc.GetField(c.revisionField)
434	if err != nil && gcerrors.Code(err) != gcerrors.NotFound {
435		return nil, nil, err
436	}
437	// Only select the document with the given ID.
438	filter = bson.D{bson.E{Key: "_id", Value: id}}
439	// If the given document has a revision, it must match the stored document.
440	if rev != nil {
441		filter = append(filter, bson.E{Key: c.revisionField, Value: rev})
442	}
443	return filter, rev, nil
444}
445
446// bulkWrite calls the Mongo driver's BulkWrite RPC in unordered mode with the
447// actions, which must be writes.
448// errs is the slice of errors indexed by the position of the action in the original
449// action list. bulkWrite populates this slice. In addition, bulkWrite returns a list
450// of errors that cannot be attributed to any single action.
451func (c *collection) bulkWrite(ctx context.Context, actions []*driver.Action, errs []error, dopts *driver.RunActionsOptions) []error {
452	var (
453		models          []mongo.WriteModel
454		modelActions    []*driver.Action // corresponding action for each model
455		newIDs          []interface{}    // new IDs for Create actions, corresponding to models slice
456		revs            []string         // new revisions, corresponding to models slice
457		nDeletes        int64
458		nNonCreateWrite int64 // total operations expected from Put, Replace and Update
459	)
460	for _, a := range actions {
461		var m mongo.WriteModel
462		var err error
463		var newID interface{}
464		var rev string
465		switch a.Kind {
466		case driver.Create:
467			m, newID, rev, err = c.newCreateModel(a)
468		case driver.Delete:
469			m, err = c.newDeleteModel(a)
470			if err == nil {
471				nDeletes++
472			}
473		case driver.Replace, driver.Put:
474			m, rev, err = c.newReplaceModel(a, a.Kind == driver.Put)
475			if err == nil {
476				nNonCreateWrite++
477			}
478		case driver.Update:
479			m, rev, err = c.newUpdateModel(a)
480			if err == nil && m != nil {
481				nNonCreateWrite++
482			}
483		default:
484			err = gcerr.Newf(gcerr.Internal, nil, "bad action %+v", a)
485		}
486		if err != nil {
487			errs[a.Index] = err
488		} else if m != nil { // m can be nil for a no-op update
489			models = append(models, m)
490			modelActions = append(modelActions, a)
491			newIDs = append(newIDs, newID)
492			revs = append(revs, rev)
493		}
494	}
495	if len(models) == 0 {
496		return nil
497	}
498
499	bopts := options.BulkWrite().SetOrdered(false)
500	if dopts.BeforeDo != nil {
501		asFunc := func(target interface{}) bool {
502			switch t := target.(type) {
503			case *[]mongo.WriteModel:
504				*t = models
505			case **options.BulkWriteOptions:
506				*t = bopts
507			default:
508				return false
509			}
510			return true
511		}
512		if err := dopts.BeforeDo(asFunc); err != nil {
513			return []error{err}
514		}
515	}
516
517	// TODO(jba): improve independent execution. I think that even if BulkWrite returns an error,
518	// some of the actions may have succeeded.
519	var reterrs []error
520	res, err := c.coll.BulkWrite(ctx, models, bopts)
521	if err != nil {
522		bwe, ok := err.(mongo.BulkWriteException)
523		if !ok { // assume everything failed with this error
524			return []error{err}
525		}
526		// The returned indexes of the WriteErrors are wrong. See https://jira.mongodb.org/browse/GODRIVER-1028.
527		// Until it's fixed, use negative values for the indexes in the errors we return.
528		for _, w := range bwe.WriteErrors {
529			reterrs = append(reterrs, gcerr.Newf(translateMongoCode(w.Code), w, "%s", w.Message))
530		}
531		return reterrs
532	}
533	for i, newID := range newIDs {
534		if newID == nil {
535			continue
536		}
537		a := modelActions[i]
538		if err := a.Doc.SetField(c.idField, newID); err != nil {
539			errs[a.Index] = err
540		}
541	}
542	for i, rev := range revs {
543		a := modelActions[i]
544		if rev != "" && c.hasField(a.Doc, c.revisionField) {
545			if err := a.Doc.SetField(c.revisionField, rev); err != nil && errs[a.Index] == nil {
546				errs[a.Index] = err
547			}
548		}
549	}
550	if res.DeletedCount != nDeletes {
551		// Some Delete actions failed. It's not an error if a Delete failed because
552		// the document didn't exist, but it is an error if it failed because of a
553		// precondition mismatch. Find all the documents with revisions we tried to delete; if
554		// any are still present, that's an error.
555		c.determineDeleteErrors(ctx, models, modelActions, errs)
556	}
557	if res.MatchedCount+res.UpsertedCount != nNonCreateWrite {
558		reterrs = append(reterrs, gcerr.Newf(gcerr.NotFound, nil, "some writes failed (replaced %d, upserted %d, out of total %d)", res.MatchedCount, res.UpsertedCount, nNonCreateWrite))
559	}
560	return reterrs
561}
562
563func (c *collection) determineDeleteErrors(ctx context.Context, models []mongo.WriteModel, actions []*driver.Action, errs []error) {
564	// TODO(jba): do this concurrently.
565	for i, m := range models {
566		if dm, ok := m.(*mongo.DeleteOneModel); ok {
567			filter := dm.Filter.(bson.D)
568			if len(filter) > 1 {
569				// Delete with both ID and revision. See if the document is still there.
570				idOnlyFilter := filter[:1]
571				// TODO(shantuo): use Find instead of FindOne.
572				res := c.coll.FindOne(ctx, idOnlyFilter)
573
574				// Assume an error means the document wasn't found.
575				// That means either that it was deleted successfully, or that it never
576				// existed. Either way, it's not an error.
577				// TODO(jba): distinguish between not found and other errors.
578				if res.Err() == nil {
579					// The document exists, but we didn't delete it: assume we had the wrong
580					// revision.
581					errs[actions[i].Index] = gcerr.Newf(gcerr.FailedPrecondition, nil,
582						"wrong revision for document with ID %v", actions[i].Key)
583				}
584			}
585		}
586	}
587}
588
589func (c *collection) newCreateModel(a *driver.Action) (*mongo.InsertOneModel, interface{}, string, error) {
590	mdoc, createdID, rev, err := c.prepareCreate(a)
591	if err != nil {
592		return nil, nil, "", err
593	}
594	return &mongo.InsertOneModel{Document: mdoc}, createdID, rev, nil
595}
596
597func (c *collection) newDeleteModel(a *driver.Action) (*mongo.DeleteOneModel, error) {
598	id, err := encodeValue(a.Key)
599	if err != nil {
600		return nil, err
601	}
602	filter, _, err := c.makeFilter(id, a.Doc)
603	if err != nil {
604		return nil, err
605	}
606	return &mongo.DeleteOneModel{Filter: filter}, nil
607}
608
609func (c *collection) newReplaceModel(a *driver.Action, upsert bool) (*mongo.ReplaceOneModel, string, error) {
610	filter, mdoc, rev, err := c.prepareReplace(a)
611	if err != nil {
612		return nil, "", err
613	}
614	return &mongo.ReplaceOneModel{
615		Filter:      filter,
616		Replacement: mdoc,
617		Upsert:      &upsert,
618	}, rev, nil
619}
620
621func (c *collection) newUpdateModel(a *driver.Action) (*mongo.UpdateOneModel, string, error) {
622	filter, updateDoc, rev, err := c.prepareUpdate(a)
623	if err != nil {
624		return nil, "", err
625	}
626	if filter == nil { // no-op
627		return nil, "", nil
628	}
629	return &mongo.UpdateOneModel{Filter: filter, Update: updateDoc}, rev, nil
630}
631
632// RevisionToBytes implements driver.RevisionToBytes.
633func (c *collection) RevisionToBytes(rev interface{}) ([]byte, error) {
634	s, ok := rev.(string)
635	if !ok {
636		return nil, gcerr.Newf(gcerr.InvalidArgument, nil, "revision %v of type %[1]T is not a string", rev)
637	}
638	return []byte(s), nil
639}
640
641func (c *collection) hasField(doc driver.Document, field string) bool {
642	if c.opts.LowercaseFields {
643		return doc.HasFieldFold(field)
644	}
645	return doc.HasField(field)
646}
647
648// BytesToRevision implements driver.BytesToRevision.
649func (c *collection) BytesToRevision(b []byte) (interface{}, error) {
650	return string(b), nil
651}
652
653// As implements driver.As.
654func (c *collection) As(i interface{}) bool {
655	p, ok := i.(**mongo.Collection)
656	if !ok {
657		return false
658	}
659	*p = c.coll
660	return true
661}
662
663// ErrorAs implements driver.Collection.ErrorAs
664func (c *collection) ErrorAs(err error, i interface{}) bool {
665	switch e := err.(type) {
666	case mongo.CommandError:
667		if p, ok := i.(*mongo.CommandError); ok {
668			*p = e
669			return true
670		}
671	case mongo.BulkWriteError:
672		if p, ok := i.(*mongo.BulkWriteError); ok {
673			*p = e
674			return true
675		}
676	case mongo.BulkWriteException:
677		if p, ok := i.(*mongo.BulkWriteException); ok {
678			*p = e
679			return true
680		}
681	}
682	return false
683}
684
685// ErrorCode implements driver.Collection.ErrorCode.
686func (c *collection) ErrorCode(err error) gcerrors.ErrorCode {
687	if g, ok := err.(*gcerr.Error); ok {
688		return g.Code
689	}
690	if err == mongo.ErrNoDocuments {
691		return gcerrors.NotFound
692	}
693	if wexc, ok := err.(mongo.WriteException); ok && len(wexc.WriteErrors) > 0 {
694		return translateMongoCode(wexc.WriteErrors[0].Code)
695	}
696	return gcerrors.Unknown
697}
698
699// Close implements driver.Collection.Close.
700func (c *collection) Close() error { return nil }
701
702// Error code for a write error when no documents match a filter.
703// (The Go mongo driver doesn't define an exported constant for this.)
704const mongoDupKeyCode = 11000
705
706func translateMongoCode(code int) gcerrors.ErrorCode {
707	switch code {
708	case mongoDupKeyCode:
709		return gcerrors.AlreadyExists
710	default:
711		return gcerrors.Unknown
712	}
713}
714