1// Copyright (C) MongoDB, Inc. 2019-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7// Code generated by operationgen. DO NOT EDIT. 8 9package operation 10 11import ( 12 "context" 13 "errors" 14 15 "go.mongodb.org/mongo-driver/bson/bsontype" 16 "go.mongodb.org/mongo-driver/event" 17 "go.mongodb.org/mongo-driver/mongo/readconcern" 18 "go.mongodb.org/mongo-driver/mongo/readpref" 19 "go.mongodb.org/mongo-driver/mongo/writeconcern" 20 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 21 "go.mongodb.org/mongo-driver/x/mongo/driver" 22 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 23 "go.mongodb.org/mongo-driver/x/mongo/driver/session" 24) 25 26// Performs an aggregate operation 27type Aggregate struct { 28 allowDiskUse *bool 29 batchSize *int32 30 bypassDocumentValidation *bool 31 collation bsoncore.Document 32 comment *string 33 hint bsoncore.Value 34 maxTimeMS *int64 35 pipeline bsoncore.Document 36 session *session.Client 37 clock *session.ClusterClock 38 collection string 39 monitor *event.CommandMonitor 40 database string 41 deployment driver.Deployment 42 readConcern *readconcern.ReadConcern 43 readPreference *readpref.ReadPref 44 retry *driver.RetryMode 45 selector description.ServerSelector 46 writeConcern *writeconcern.WriteConcern 47 crypt *driver.Crypt 48 49 result driver.CursorResponse 50} 51 52// NewAggregate constructs and returns a new Aggregate. 53func NewAggregate(pipeline bsoncore.Document) *Aggregate { 54 return &Aggregate{ 55 pipeline: pipeline, 56 } 57} 58 59// Result returns the result of executing this operation. 60func (a *Aggregate) Result(opts driver.CursorOptions) (*driver.BatchCursor, error) { 61 62 clientSession := a.session 63 64 clock := a.clock 65 return driver.NewBatchCursor(a.result, clientSession, clock, opts) 66} 67 68func (a *Aggregate) ResultCursorResponse() driver.CursorResponse { 69 return a.result 70} 71 72func (a *Aggregate) processResponse(response bsoncore.Document, srvr driver.Server, desc description.Server, currIndex int) error { 73 var err error 74 75 a.result, err = driver.NewCursorResponse(response, srvr, desc) 76 return err 77 78} 79 80// Execute runs this operations and returns an error if the operaiton did not execute successfully. 81func (a *Aggregate) Execute(ctx context.Context) error { 82 if a.deployment == nil { 83 return errors.New("the Aggregate operation must have a Deployment set before Execute can be called") 84 } 85 86 return driver.Operation{ 87 CommandFn: a.command, 88 ProcessResponseFn: a.processResponse, 89 90 Client: a.session, 91 Clock: a.clock, 92 CommandMonitor: a.monitor, 93 Database: a.database, 94 Deployment: a.deployment, 95 ReadConcern: a.readConcern, 96 ReadPreference: a.readPreference, 97 Type: driver.Read, 98 RetryMode: a.retry, 99 Selector: a.selector, 100 WriteConcern: a.writeConcern, 101 Crypt: a.crypt, 102 MinimumWriteConcernWireVersion: 5, 103 }.Execute(ctx, nil) 104 105} 106 107func (a *Aggregate) command(dst []byte, desc description.SelectedServer) ([]byte, error) { 108 header := bsoncore.Value{Type: bsontype.String, Data: bsoncore.AppendString(nil, a.collection)} 109 if a.collection == "" { 110 header = bsoncore.Value{Type: bsontype.Int32, Data: []byte{0x01, 0x00, 0x00, 0x00}} 111 } 112 dst = bsoncore.AppendValueElement(dst, "aggregate", header) 113 114 cursorIdx, cursorDoc := bsoncore.AppendDocumentStart(nil) 115 if a.allowDiskUse != nil { 116 117 dst = bsoncore.AppendBooleanElement(dst, "allowDiskUse", *a.allowDiskUse) 118 } 119 if a.batchSize != nil { 120 cursorDoc = bsoncore.AppendInt32Element(cursorDoc, "batchSize", *a.batchSize) 121 } 122 if a.bypassDocumentValidation != nil { 123 124 dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *a.bypassDocumentValidation) 125 } 126 if a.collation != nil { 127 128 if desc.WireVersion == nil || !desc.WireVersion.Includes(5) { 129 return nil, errors.New("the 'collation' command parameter requires a minimum server wire version of 5") 130 } 131 dst = bsoncore.AppendDocumentElement(dst, "collation", a.collation) 132 } 133 if a.comment != nil { 134 135 dst = bsoncore.AppendStringElement(dst, "comment", *a.comment) 136 } 137 if a.hint.Type != bsontype.Type(0) { 138 139 dst = bsoncore.AppendValueElement(dst, "hint", a.hint) 140 } 141 if a.maxTimeMS != nil { 142 143 dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", *a.maxTimeMS) 144 } 145 if a.pipeline != nil { 146 147 dst = bsoncore.AppendArrayElement(dst, "pipeline", a.pipeline) 148 } 149 cursorDoc, _ = bsoncore.AppendDocumentEnd(cursorDoc, cursorIdx) 150 dst = bsoncore.AppendDocumentElement(dst, "cursor", cursorDoc) 151 152 return dst, nil 153} 154 155// AllowDiskUse enables writing to temporary files. When true, aggregation stages can write to the dbPath/_tmp directory. 156func (a *Aggregate) AllowDiskUse(allowDiskUse bool) *Aggregate { 157 if a == nil { 158 a = new(Aggregate) 159 } 160 161 a.allowDiskUse = &allowDiskUse 162 return a 163} 164 165// BatchSize specifies the number of documents to return in every batch. 166func (a *Aggregate) BatchSize(batchSize int32) *Aggregate { 167 if a == nil { 168 a = new(Aggregate) 169 } 170 171 a.batchSize = &batchSize 172 return a 173} 174 175// BypassDocumentValidation allows the write to opt-out of document level validation. This only applies when the $out stage is specified. 176func (a *Aggregate) BypassDocumentValidation(bypassDocumentValidation bool) *Aggregate { 177 if a == nil { 178 a = new(Aggregate) 179 } 180 181 a.bypassDocumentValidation = &bypassDocumentValidation 182 return a 183} 184 185// Collation specifies a collation. This option is only valid for server versions 3.4 and above. 186func (a *Aggregate) Collation(collation bsoncore.Document) *Aggregate { 187 if a == nil { 188 a = new(Aggregate) 189 } 190 191 a.collation = collation 192 return a 193} 194 195// Comment specifies an arbitrary string to help trace the operation through the database profiler, currentOp, and logs. 196func (a *Aggregate) Comment(comment string) *Aggregate { 197 if a == nil { 198 a = new(Aggregate) 199 } 200 201 a.comment = &comment 202 return a 203} 204 205// Hint specifies the index to use. 206func (a *Aggregate) Hint(hint bsoncore.Value) *Aggregate { 207 if a == nil { 208 a = new(Aggregate) 209 } 210 211 a.hint = hint 212 return a 213} 214 215// MaxTimeMS specifies the maximum amount of time to allow the query to run. 216func (a *Aggregate) MaxTimeMS(maxTimeMS int64) *Aggregate { 217 if a == nil { 218 a = new(Aggregate) 219 } 220 221 a.maxTimeMS = &maxTimeMS 222 return a 223} 224 225// Pipeline determines how data is transformed for an aggregation. 226func (a *Aggregate) Pipeline(pipeline bsoncore.Document) *Aggregate { 227 if a == nil { 228 a = new(Aggregate) 229 } 230 231 a.pipeline = pipeline 232 return a 233} 234 235// Session sets the session for this operation. 236func (a *Aggregate) Session(session *session.Client) *Aggregate { 237 if a == nil { 238 a = new(Aggregate) 239 } 240 241 a.session = session 242 return a 243} 244 245// ClusterClock sets the cluster clock for this operation. 246func (a *Aggregate) ClusterClock(clock *session.ClusterClock) *Aggregate { 247 if a == nil { 248 a = new(Aggregate) 249 } 250 251 a.clock = clock 252 return a 253} 254 255// Collection sets the collection that this command will run against. 256func (a *Aggregate) Collection(collection string) *Aggregate { 257 if a == nil { 258 a = new(Aggregate) 259 } 260 261 a.collection = collection 262 return a 263} 264 265// CommandMonitor sets the monitor to use for APM events. 266func (a *Aggregate) CommandMonitor(monitor *event.CommandMonitor) *Aggregate { 267 if a == nil { 268 a = new(Aggregate) 269 } 270 271 a.monitor = monitor 272 return a 273} 274 275// Database sets the database to run this operation against. 276func (a *Aggregate) Database(database string) *Aggregate { 277 if a == nil { 278 a = new(Aggregate) 279 } 280 281 a.database = database 282 return a 283} 284 285// Deployment sets the deployment to use for this operation. 286func (a *Aggregate) Deployment(deployment driver.Deployment) *Aggregate { 287 if a == nil { 288 a = new(Aggregate) 289 } 290 291 a.deployment = deployment 292 return a 293} 294 295// ReadConcern specifies the read concern for this operation. 296func (a *Aggregate) ReadConcern(readConcern *readconcern.ReadConcern) *Aggregate { 297 if a == nil { 298 a = new(Aggregate) 299 } 300 301 a.readConcern = readConcern 302 return a 303} 304 305// ReadPreference set the read prefernce used with this operation. 306func (a *Aggregate) ReadPreference(readPreference *readpref.ReadPref) *Aggregate { 307 if a == nil { 308 a = new(Aggregate) 309 } 310 311 a.readPreference = readPreference 312 return a 313} 314 315// ServerSelector sets the selector used to retrieve a server. 316func (a *Aggregate) ServerSelector(selector description.ServerSelector) *Aggregate { 317 if a == nil { 318 a = new(Aggregate) 319 } 320 321 a.selector = selector 322 return a 323} 324 325// WriteConcern sets the write concern for this operation. 326func (a *Aggregate) WriteConcern(writeConcern *writeconcern.WriteConcern) *Aggregate { 327 if a == nil { 328 a = new(Aggregate) 329 } 330 331 a.writeConcern = writeConcern 332 return a 333} 334 335// Retry enables retryable writes for this operation. Retries are not handled automatically, 336// instead a boolean is returned from Execute and SelectAndExecute that indicates if the 337// operation can be retried. Retrying is handled by calling RetryExecute. 338func (a *Aggregate) Retry(retry driver.RetryMode) *Aggregate { 339 if a == nil { 340 a = new(Aggregate) 341 } 342 343 a.retry = &retry 344 return a 345} 346 347// Crypt sets the Crypt object to use for automatic encryption and decryption. 348func (a *Aggregate) Crypt(crypt *driver.Crypt) *Aggregate { 349 if a == nil { 350 a = new(Aggregate) 351 } 352 353 a.crypt = crypt 354 return a 355} 356