1// Copyright (C) MongoDB, Inc. 2019-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7// Code generated by operationgen. DO NOT EDIT.
8
9package operation
10
11import (
12	"context"
13	"errors"
14
15	"go.mongodb.org/mongo-driver/bson/bsontype"
16	"go.mongodb.org/mongo-driver/event"
17	"go.mongodb.org/mongo-driver/mongo/description"
18	"go.mongodb.org/mongo-driver/mongo/readconcern"
19	"go.mongodb.org/mongo-driver/mongo/readpref"
20	"go.mongodb.org/mongo-driver/mongo/writeconcern"
21	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
22	"go.mongodb.org/mongo-driver/x/mongo/driver"
23	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
24)
25
26// Performs an aggregate operation
27type Aggregate struct {
28	allowDiskUse             *bool
29	batchSize                *int32
30	bypassDocumentValidation *bool
31	collation                bsoncore.Document
32	comment                  *string
33	hint                     bsoncore.Value
34	maxTimeMS                *int64
35	pipeline                 bsoncore.Document
36	session                  *session.Client
37	clock                    *session.ClusterClock
38	collection               string
39	monitor                  *event.CommandMonitor
40	database                 string
41	deployment               driver.Deployment
42	readConcern              *readconcern.ReadConcern
43	readPreference           *readpref.ReadPref
44	retry                    *driver.RetryMode
45	selector                 description.ServerSelector
46	writeConcern             *writeconcern.WriteConcern
47	crypt                    *driver.Crypt
48
49	result driver.CursorResponse
50}
51
52// NewAggregate constructs and returns a new Aggregate.
53func NewAggregate(pipeline bsoncore.Document) *Aggregate {
54	return &Aggregate{
55		pipeline: pipeline,
56	}
57}
58
59// Result returns the result of executing this operation.
60func (a *Aggregate) Result(opts driver.CursorOptions) (*driver.BatchCursor, error) {
61
62	clientSession := a.session
63
64	clock := a.clock
65	return driver.NewBatchCursor(a.result, clientSession, clock, opts)
66}
67
68func (a *Aggregate) ResultCursorResponse() driver.CursorResponse {
69	return a.result
70}
71
72func (a *Aggregate) processResponse(response bsoncore.Document, srvr driver.Server, desc description.Server, currIndex int) error {
73	var err error
74
75	a.result, err = driver.NewCursorResponse(response, srvr, desc)
76	return err
77
78}
79
80// Execute runs this operations and returns an error if the operaiton did not execute successfully.
81func (a *Aggregate) Execute(ctx context.Context) error {
82	if a.deployment == nil {
83		return errors.New("the Aggregate operation must have a Deployment set before Execute can be called")
84	}
85
86	return driver.Operation{
87		CommandFn:         a.command,
88		ProcessResponseFn: a.processResponse,
89
90		Client:                         a.session,
91		Clock:                          a.clock,
92		CommandMonitor:                 a.monitor,
93		Database:                       a.database,
94		Deployment:                     a.deployment,
95		ReadConcern:                    a.readConcern,
96		ReadPreference:                 a.readPreference,
97		Type:                           driver.Read,
98		RetryMode:                      a.retry,
99		Selector:                       a.selector,
100		WriteConcern:                   a.writeConcern,
101		Crypt:                          a.crypt,
102		MinimumWriteConcernWireVersion: 5,
103	}.Execute(ctx, nil)
104
105}
106
107func (a *Aggregate) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
108	header := bsoncore.Value{Type: bsontype.String, Data: bsoncore.AppendString(nil, a.collection)}
109	if a.collection == "" {
110		header = bsoncore.Value{Type: bsontype.Int32, Data: []byte{0x01, 0x00, 0x00, 0x00}}
111	}
112	dst = bsoncore.AppendValueElement(dst, "aggregate", header)
113
114	cursorIdx, cursorDoc := bsoncore.AppendDocumentStart(nil)
115	if a.allowDiskUse != nil {
116
117		dst = bsoncore.AppendBooleanElement(dst, "allowDiskUse", *a.allowDiskUse)
118	}
119	if a.batchSize != nil {
120		cursorDoc = bsoncore.AppendInt32Element(cursorDoc, "batchSize", *a.batchSize)
121	}
122	if a.bypassDocumentValidation != nil {
123
124		dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *a.bypassDocumentValidation)
125	}
126	if a.collation != nil {
127
128		if desc.WireVersion == nil || !desc.WireVersion.Includes(5) {
129			return nil, errors.New("the 'collation' command parameter requires a minimum server wire version of 5")
130		}
131		dst = bsoncore.AppendDocumentElement(dst, "collation", a.collation)
132	}
133	if a.comment != nil {
134
135		dst = bsoncore.AppendStringElement(dst, "comment", *a.comment)
136	}
137	if a.hint.Type != bsontype.Type(0) {
138
139		dst = bsoncore.AppendValueElement(dst, "hint", a.hint)
140	}
141	if a.maxTimeMS != nil {
142
143		dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", *a.maxTimeMS)
144	}
145	if a.pipeline != nil {
146
147		dst = bsoncore.AppendArrayElement(dst, "pipeline", a.pipeline)
148	}
149	cursorDoc, _ = bsoncore.AppendDocumentEnd(cursorDoc, cursorIdx)
150	dst = bsoncore.AppendDocumentElement(dst, "cursor", cursorDoc)
151
152	return dst, nil
153}
154
155// AllowDiskUse enables writing to temporary files. When true, aggregation stages can write to the dbPath/_tmp directory.
156func (a *Aggregate) AllowDiskUse(allowDiskUse bool) *Aggregate {
157	if a == nil {
158		a = new(Aggregate)
159	}
160
161	a.allowDiskUse = &allowDiskUse
162	return a
163}
164
165// BatchSize specifies the number of documents to return in every batch.
166func (a *Aggregate) BatchSize(batchSize int32) *Aggregate {
167	if a == nil {
168		a = new(Aggregate)
169	}
170
171	a.batchSize = &batchSize
172	return a
173}
174
175// BypassDocumentValidation allows the write to opt-out of document level validation. This only applies when the $out stage is specified.
176func (a *Aggregate) BypassDocumentValidation(bypassDocumentValidation bool) *Aggregate {
177	if a == nil {
178		a = new(Aggregate)
179	}
180
181	a.bypassDocumentValidation = &bypassDocumentValidation
182	return a
183}
184
185// Collation specifies a collation. This option is only valid for server versions 3.4 and above.
186func (a *Aggregate) Collation(collation bsoncore.Document) *Aggregate {
187	if a == nil {
188		a = new(Aggregate)
189	}
190
191	a.collation = collation
192	return a
193}
194
195// Comment specifies an arbitrary string to help trace the operation through the database profiler, currentOp, and logs.
196func (a *Aggregate) Comment(comment string) *Aggregate {
197	if a == nil {
198		a = new(Aggregate)
199	}
200
201	a.comment = &comment
202	return a
203}
204
205// Hint specifies the index to use.
206func (a *Aggregate) Hint(hint bsoncore.Value) *Aggregate {
207	if a == nil {
208		a = new(Aggregate)
209	}
210
211	a.hint = hint
212	return a
213}
214
215// MaxTimeMS specifies the maximum amount of time to allow the query to run.
216func (a *Aggregate) MaxTimeMS(maxTimeMS int64) *Aggregate {
217	if a == nil {
218		a = new(Aggregate)
219	}
220
221	a.maxTimeMS = &maxTimeMS
222	return a
223}
224
225// Pipeline determines how data is transformed for an aggregation.
226func (a *Aggregate) Pipeline(pipeline bsoncore.Document) *Aggregate {
227	if a == nil {
228		a = new(Aggregate)
229	}
230
231	a.pipeline = pipeline
232	return a
233}
234
235// Session sets the session for this operation.
236func (a *Aggregate) Session(session *session.Client) *Aggregate {
237	if a == nil {
238		a = new(Aggregate)
239	}
240
241	a.session = session
242	return a
243}
244
245// ClusterClock sets the cluster clock for this operation.
246func (a *Aggregate) ClusterClock(clock *session.ClusterClock) *Aggregate {
247	if a == nil {
248		a = new(Aggregate)
249	}
250
251	a.clock = clock
252	return a
253}
254
255// Collection sets the collection that this command will run against.
256func (a *Aggregate) Collection(collection string) *Aggregate {
257	if a == nil {
258		a = new(Aggregate)
259	}
260
261	a.collection = collection
262	return a
263}
264
265// CommandMonitor sets the monitor to use for APM events.
266func (a *Aggregate) CommandMonitor(monitor *event.CommandMonitor) *Aggregate {
267	if a == nil {
268		a = new(Aggregate)
269	}
270
271	a.monitor = monitor
272	return a
273}
274
275// Database sets the database to run this operation against.
276func (a *Aggregate) Database(database string) *Aggregate {
277	if a == nil {
278		a = new(Aggregate)
279	}
280
281	a.database = database
282	return a
283}
284
285// Deployment sets the deployment to use for this operation.
286func (a *Aggregate) Deployment(deployment driver.Deployment) *Aggregate {
287	if a == nil {
288		a = new(Aggregate)
289	}
290
291	a.deployment = deployment
292	return a
293}
294
295// ReadConcern specifies the read concern for this operation.
296func (a *Aggregate) ReadConcern(readConcern *readconcern.ReadConcern) *Aggregate {
297	if a == nil {
298		a = new(Aggregate)
299	}
300
301	a.readConcern = readConcern
302	return a
303}
304
305// ReadPreference set the read prefernce used with this operation.
306func (a *Aggregate) ReadPreference(readPreference *readpref.ReadPref) *Aggregate {
307	if a == nil {
308		a = new(Aggregate)
309	}
310
311	a.readPreference = readPreference
312	return a
313}
314
315// ServerSelector sets the selector used to retrieve a server.
316func (a *Aggregate) ServerSelector(selector description.ServerSelector) *Aggregate {
317	if a == nil {
318		a = new(Aggregate)
319	}
320
321	a.selector = selector
322	return a
323}
324
325// WriteConcern sets the write concern for this operation.
326func (a *Aggregate) WriteConcern(writeConcern *writeconcern.WriteConcern) *Aggregate {
327	if a == nil {
328		a = new(Aggregate)
329	}
330
331	a.writeConcern = writeConcern
332	return a
333}
334
335// Retry enables retryable writes for this operation. Retries are not handled automatically,
336// instead a boolean is returned from Execute and SelectAndExecute that indicates if the
337// operation can be retried. Retrying is handled by calling RetryExecute.
338func (a *Aggregate) Retry(retry driver.RetryMode) *Aggregate {
339	if a == nil {
340		a = new(Aggregate)
341	}
342
343	a.retry = &retry
344	return a
345}
346
347// Crypt sets the Crypt object to use for automatic encryption and decryption.
348func (a *Aggregate) Crypt(crypt *driver.Crypt) *Aggregate {
349	if a == nil {
350		a = new(Aggregate)
351	}
352
353	a.crypt = crypt
354	return a
355}
356