1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package unified 8 9import ( 10 "context" 11 "fmt" 12 "time" 13 14 "go.mongodb.org/mongo-driver/bson/primitive" 15 testhelpers "go.mongodb.org/mongo-driver/internal/testutil/helpers" 16 "go.mongodb.org/mongo-driver/mongo" 17 "go.mongodb.org/mongo-driver/mongo/options" 18 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 19) 20 21// This file contains helpers to execute client operations. 22 23func executeCreateChangeStream(ctx context.Context, operation *Operation) (*OperationResult, error) { 24 var watcher interface { 25 Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) 26 } 27 var err error 28 29 watcher, err = Entities(ctx).Client(operation.Object) 30 if err != nil { 31 watcher, err = Entities(ctx).Database(operation.Object) 32 } 33 if err != nil { 34 watcher, err = Entities(ctx).Collection(operation.Object) 35 } 36 if err != nil { 37 return nil, fmt.Errorf("no client, database, or collection entity found with ID %q", operation.Object) 38 } 39 40 var pipeline []interface{} 41 opts := options.ChangeStream() 42 43 elems, _ := operation.Arguments.Elements() 44 for _, elem := range elems { 45 key := elem.Key() 46 val := elem.Value() 47 48 switch key { 49 case "batchSize": 50 opts.SetBatchSize(val.Int32()) 51 case "collation": 52 collation, err := createCollation(val.Document()) 53 if err != nil { 54 return nil, fmt.Errorf("error creating collation: %v", err) 55 } 56 opts.SetCollation(*collation) 57 case "fullDocument": 58 switch fd := val.StringValue(); fd { 59 case "default": 60 opts.SetFullDocument(options.Default) 61 case "updateLookup": 62 opts.SetFullDocument(options.UpdateLookup) 63 default: 64 return nil, fmt.Errorf("unrecognized fullDocument value %q", fd) 65 } 66 case "maxAwaitTimeMS": 67 opts.SetMaxAwaitTime(time.Duration(val.Int32()) * time.Millisecond) 68 case "pipeline": 69 pipeline = testhelpers.RawToInterfaceSlice(val.Array()) 70 case "resumeAfter": 71 opts.SetResumeAfter(val.Document()) 72 case "startAfter": 73 opts.SetStartAfter(val.Document()) 74 case "startAtOperationTime": 75 t, i := val.Timestamp() 76 opts.SetStartAtOperationTime(&primitive.Timestamp{T: t, I: i}) 77 default: 78 return nil, fmt.Errorf("unrecognized createChangeStream option %q", key) 79 } 80 } 81 if pipeline == nil { 82 return nil, newMissingArgumentError("pipeline") 83 } 84 85 stream, err := watcher.Watch(ctx, pipeline, opts) 86 if err != nil { 87 return NewErrorResult(err), nil 88 } 89 90 if operation.ResultEntityID == nil { 91 return nil, fmt.Errorf("no entity name provided to store executeChangeStream result") 92 } 93 if err := Entities(ctx).AddChangeStreamEntity(*operation.ResultEntityID, stream); err != nil { 94 return nil, fmt.Errorf("error storing result as changeStream entity: %v", err) 95 } 96 return NewEmptyResult(), nil 97} 98 99func executeListDatabases(ctx context.Context, operation *Operation) (*OperationResult, error) { 100 client, err := Entities(ctx).Client(operation.Object) 101 if err != nil { 102 return nil, err 103 } 104 105 // We set a default filter rather than erroring if the Arguments doc doesn't have a "filter" field because the 106 // spec says drivers should support this field, not must. 107 filter := emptyDocument 108 opts := options.ListDatabases() 109 110 elems, _ := operation.Arguments.Elements() 111 for _, elem := range elems { 112 key := elem.Key() 113 val := elem.Value() 114 115 switch key { 116 case "authorizedDatabases": 117 opts.SetAuthorizedDatabases(val.Boolean()) 118 case "filter": 119 filter = val.Document() 120 case "nameOnly": 121 opts.SetNameOnly(val.Boolean()) 122 default: 123 return nil, fmt.Errorf("unrecognized listDatabases option %q", key) 124 } 125 } 126 127 res, err := client.ListDatabases(ctx, filter, opts) 128 if err != nil { 129 return NewErrorResult(err), nil 130 } 131 132 specsArray := bsoncore.NewArrayBuilder() 133 for _, spec := range res.Databases { 134 rawSpec := bsoncore.NewDocumentBuilder(). 135 AppendString("name", spec.Name). 136 AppendInt64("sizeOnDisk", spec.SizeOnDisk). 137 AppendBoolean("empty", spec.Empty). 138 Build() 139 140 specsArray.AppendDocument(rawSpec) 141 } 142 raw := bsoncore.NewDocumentBuilder(). 143 AppendArray("databases", specsArray.Build()). 144 AppendInt64("totalSize", res.TotalSize). 145 Build() 146 return NewDocumentResult(raw, nil), nil 147} 148