1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package unified
8
9import (
10	"context"
11	"fmt"
12	"time"
13
14	"go.mongodb.org/mongo-driver/bson/primitive"
15	testhelpers "go.mongodb.org/mongo-driver/internal/testutil/helpers"
16	"go.mongodb.org/mongo-driver/mongo"
17	"go.mongodb.org/mongo-driver/mongo/options"
18	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
19)
20
21// This file contains helpers to execute client operations.
22
23func executeCreateChangeStream(ctx context.Context, operation *Operation) (*OperationResult, error) {
24	var watcher interface {
25		Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
26	}
27	var err error
28
29	watcher, err = Entities(ctx).Client(operation.Object)
30	if err != nil {
31		watcher, err = Entities(ctx).Database(operation.Object)
32	}
33	if err != nil {
34		watcher, err = Entities(ctx).Collection(operation.Object)
35	}
36	if err != nil {
37		return nil, fmt.Errorf("no client, database, or collection entity found with ID %q", operation.Object)
38	}
39
40	var pipeline []interface{}
41	opts := options.ChangeStream()
42
43	elems, _ := operation.Arguments.Elements()
44	for _, elem := range elems {
45		key := elem.Key()
46		val := elem.Value()
47
48		switch key {
49		case "batchSize":
50			opts.SetBatchSize(val.Int32())
51		case "collation":
52			collation, err := createCollation(val.Document())
53			if err != nil {
54				return nil, fmt.Errorf("error creating collation: %v", err)
55			}
56			opts.SetCollation(*collation)
57		case "fullDocument":
58			switch fd := val.StringValue(); fd {
59			case "default":
60				opts.SetFullDocument(options.Default)
61			case "updateLookup":
62				opts.SetFullDocument(options.UpdateLookup)
63			default:
64				return nil, fmt.Errorf("unrecognized fullDocument value %q", fd)
65			}
66		case "maxAwaitTimeMS":
67			opts.SetMaxAwaitTime(time.Duration(val.Int32()) * time.Millisecond)
68		case "pipeline":
69			pipeline = testhelpers.RawToInterfaceSlice(val.Array())
70		case "resumeAfter":
71			opts.SetResumeAfter(val.Document())
72		case "startAfter":
73			opts.SetStartAfter(val.Document())
74		case "startAtOperationTime":
75			t, i := val.Timestamp()
76			opts.SetStartAtOperationTime(&primitive.Timestamp{T: t, I: i})
77		default:
78			return nil, fmt.Errorf("unrecognized createChangeStream option %q", key)
79		}
80	}
81	if pipeline == nil {
82		return nil, newMissingArgumentError("pipeline")
83	}
84
85	stream, err := watcher.Watch(ctx, pipeline, opts)
86	if err != nil {
87		return NewErrorResult(err), nil
88	}
89
90	if operation.ResultEntityID == nil {
91		return nil, fmt.Errorf("no entity name provided to store executeChangeStream result")
92	}
93	if err := Entities(ctx).AddChangeStreamEntity(*operation.ResultEntityID, stream); err != nil {
94		return nil, fmt.Errorf("error storing result as changeStream entity: %v", err)
95	}
96	return NewEmptyResult(), nil
97}
98
99func executeListDatabases(ctx context.Context, operation *Operation) (*OperationResult, error) {
100	client, err := Entities(ctx).Client(operation.Object)
101	if err != nil {
102		return nil, err
103	}
104
105	// We set a default filter rather than erroring if the Arguments doc doesn't have a "filter" field because the
106	// spec says drivers should support this field, not must.
107	filter := emptyDocument
108	opts := options.ListDatabases()
109
110	elems, _ := operation.Arguments.Elements()
111	for _, elem := range elems {
112		key := elem.Key()
113		val := elem.Value()
114
115		switch key {
116		case "authorizedDatabases":
117			opts.SetAuthorizedDatabases(val.Boolean())
118		case "filter":
119			filter = val.Document()
120		case "nameOnly":
121			opts.SetNameOnly(val.Boolean())
122		default:
123			return nil, fmt.Errorf("unrecognized listDatabases option %q", key)
124		}
125	}
126
127	res, err := client.ListDatabases(ctx, filter, opts)
128	if err != nil {
129		return NewErrorResult(err), nil
130	}
131
132	specsArray := bsoncore.NewArrayBuilder()
133	for _, spec := range res.Databases {
134		rawSpec := bsoncore.NewDocumentBuilder().
135			AppendString("name", spec.Name).
136			AppendInt64("sizeOnDisk", spec.SizeOnDisk).
137			AppendBoolean("empty", spec.Empty).
138			Build()
139
140		specsArray.AppendDocument(rawSpec)
141	}
142	raw := bsoncore.NewDocumentBuilder().
143		AppendArray("databases", specsArray.Build()).
144		AppendInt64("totalSize", res.TotalSize).
145		Build()
146	return NewDocumentResult(raw, nil), nil
147}
148