1package mongoreplay 2 3import ( 4 "fmt" 5 "time" 6 7 mgo "github.com/10gen/llmgo" 8 "github.com/10gen/llmgo/bson" 9) 10 11// recordedOpGenerator maintains a pair of connection stubs and channel to allow 12// ops to be generated by the driver and passed to a channel 13type recordedOpGenerator struct { 14 session *SessionStub 15 serverConnection *ConnStub 16 opChan chan *RecordedOp 17} 18 19func newRecordedOpGenerator() *recordedOpGenerator { 20 session := SessionStub{} 21 var serverConnection ConnStub 22 serverConnection, session.connection = newTwoSidedConn() 23 opChan := make(chan *RecordedOp, 1000) 24 25 return &recordedOpGenerator{ 26 session: &session, 27 serverConnection: &serverConnection, 28 opChan: opChan, 29 } 30} 31 32// pushDriverRequestOps takes the pair of ops that the driver generator (the 33// nonce and the main op) and places them in the channel 34func (generator *recordedOpGenerator) pushDriverRequestOps(recordedOp *RecordedOp) { 35 generator.opChan <- recordedOp 36} 37 38type driverRequestOps struct { 39 nonce *RecordedOp 40 op *RecordedOp 41} 42 43func getmoreArgsHelper(cursorID int64, limit int32) bson.D { 44 var getmoreArgs bson.D 45 if limit > 0 { 46 getmoreArgs = bson.D{{"getMore", cursorID}, {"collection", testCollection}, {"batchSize", limit}} 47 } else { 48 getmoreArgs = bson.D{{"getMore", cursorID}, {"collection", testCollection}} 49 } 50 return getmoreArgs 51} 52 53func findArgsHelper(filter interface{}, limit int32) bson.D { 54 var findArgs bson.D 55 if limit > 0 { 56 findArgs = bson.D{{"find", testCollection}, {"filter", filter}, {"batchSize", limit}} 57 } else { 58 findArgs = bson.D{{"find", testCollection}, {"filter", filter}} 59 } 60 return findArgs 61} 62 63func commandReplyArgsHelper(cursorID int64) interface{} { 64 commandReply := &struct { 65 Cursor struct { 66 ID int64 `bson:"id"` 67 } `bson:"cursor"` 68 }{} 69 commandReply.Cursor.ID = cursorID 70 return commandReply 71} 72 73// generateInsert creates a RecordedOp insert using the given documents and 74// pushes it to the recordedOpGenerator's channel to be executed when Play() is 75// called 76func (generator *recordedOpGenerator) generateInsert(docs []interface{}) error { 77 insert := mgo.InsertOp{Collection: fmt.Sprintf("%s.%s", testDB, testCollection), 78 Documents: docs, 79 Flags: 0, 80 } 81 recordedOp, err := generator.fetchRecordedOpsFromConn(&insert) 82 if err != nil { 83 return err 84 } 85 generator.pushDriverRequestOps(recordedOp) 86 return nil 87 88} 89 90func (generator *recordedOpGenerator) generateGetLastError() error { 91 query := bson.D{{"getLastError", 1}} 92 getLastError := mgo.QueryOp{ 93 Collection: "admin.$cmd", 94 Query: query, 95 Limit: -1, 96 Flags: 0, 97 } 98 recordedOp, err := generator.fetchRecordedOpsFromConn(&getLastError) 99 if err != nil { 100 return err 101 } 102 generator.pushDriverRequestOps(recordedOp) 103 return nil 104 105} 106 107func (generator *recordedOpGenerator) generateInsertHelper(name string, startFrom, numInserts int) error { 108 for i := 0; i < numInserts; i++ { 109 doc := testDoc{ 110 Name: name, 111 DocumentNumber: i + startFrom, 112 Success: true, 113 } 114 err := generator.generateInsert([]interface{}{doc}) 115 if err != nil { 116 return err 117 } 118 } 119 return nil 120} 121 122// generateQuery creates a RecordedOp query using the given selector, limit, and 123// requestID, and pushes it to the recordedOpGenerator's channel to be executed 124// when Play() is called 125func (generator *recordedOpGenerator) generateQuery(querySelection interface{}, limit int32, requestID int32) error { 126 query := mgo.QueryOp{ 127 Flags: 0, 128 HasOptions: true, 129 Skip: 0, 130 Limit: limit, 131 Selector: bson.D{}, 132 Query: querySelection, 133 Collection: fmt.Sprintf("%s.%s", testDB, testCollection), 134 Options: mgo.QueryWrapper{}, 135 } 136 137 recordedOp, err := generator.fetchRecordedOpsFromConn(&query) 138 if err != nil { 139 return err 140 } 141 recordedOp.RawOp.Header.RequestID = requestID 142 generator.pushDriverRequestOps(recordedOp) 143 return nil 144} 145 146func (generator *recordedOpGenerator) generateCommandOpInsertHelper(name string, startFrom, numInserts int) error { 147 for i := 0; i < numInserts; i++ { 148 doc := testDoc{ 149 Name: name, 150 DocumentNumber: i + startFrom, 151 Success: true, 152 } 153 commandArgs := bson.D{{"documents", []testDoc{doc}}, {"insert", testCollection}} 154 err := generator.generateCommandOp("insert", commandArgs, 0) 155 if err != nil { 156 return err 157 } 158 } 159 return nil 160} 161 162func (generator *recordedOpGenerator) generateCommandFind(filter interface{}, limit int32, requestID int32) error { 163 findArgs := findArgsHelper(filter, limit) 164 return generator.generateCommandOp("find", findArgs, requestID) 165} 166 167func (generator *recordedOpGenerator) generateCommandGetMore(cursorID int64, limit int32) error { 168 getmoreArgs := getmoreArgsHelper(cursorID, limit) 169 return generator.generateCommandOp("getMore", getmoreArgs, 0) 170} 171func (generator *recordedOpGenerator) generateCommandReply(responseTo int32, cursorID int64) error { 172 commandReply := commandReplyArgsHelper(cursorID) 173 174 reply := mgo.CommandReplyOp{ 175 Metadata: &struct{}{}, 176 CommandReply: commandReply, 177 OutputDocs: []interface{}{}, 178 } 179 180 recordedOp, err := generator.fetchRecordedOpsFromConn(&reply) 181 if err != nil { 182 return err 183 } 184 185 recordedOp.RawOp.Header.ResponseTo = responseTo 186 tempEnd := recordedOp.SrcEndpoint 187 recordedOp.SrcEndpoint = recordedOp.DstEndpoint 188 recordedOp.DstEndpoint = tempEnd 189 generator.pushDriverRequestOps(recordedOp) 190 return nil 191} 192 193func (generator *recordedOpGenerator) generateCommandOp(commandName string, commandArgs bson.D, requestID int32) error { 194 command := mgo.CommandOp{ 195 Database: testDB, 196 CommandName: commandName, 197 Metadata: bson.D{}, 198 CommandArgs: commandArgs, 199 InputDocs: make([]interface{}, 0), 200 } 201 202 recordedOp, err := generator.fetchRecordedOpsFromConn(&command) 203 if err != nil { 204 return err 205 } 206 recordedOp.RawOp.Header.RequestID = requestID 207 generator.pushDriverRequestOps(recordedOp) 208 return nil 209} 210 211// generateGetMore creates a RecordedOp getMore using the given cursorID and 212// limit and pushes it to the recordedOpGenerator's channel to be executed when 213// Play() is called 214func (generator *recordedOpGenerator) generateGetMore(cursorID int64, limit int32) error { 215 getMore := mgo.GetMoreOp{ 216 Collection: fmt.Sprintf("%s.%s", testDB, testCollection), 217 CursorId: cursorID, 218 Limit: limit, 219 } 220 221 recordedOp, err := generator.fetchRecordedOpsFromConn(&getMore) 222 if err != nil { 223 return err 224 } 225 generator.pushDriverRequestOps(recordedOp) 226 return nil 227} 228 229// generateReply creates a RecordedOp reply using the given responseTo, 230// cursorID, and firstDOc, and pushes it to the recordedOpGenerator's channel to 231// be executed when Play() is called 232func (generator *recordedOpGenerator) generateReply(responseTo int32, cursorID int64) error { 233 reply := mgo.ReplyOp{ 234 Flags: 0, 235 CursorId: cursorID, 236 FirstDoc: 0, 237 ReplyDocs: 5, 238 } 239 240 recordedOp, err := generator.fetchRecordedOpsFromConn(&reply) 241 if err != nil { 242 return err 243 } 244 245 recordedOp.RawOp.Header.ResponseTo = responseTo 246 SetInt64(recordedOp.RawOp.Body, 4, cursorID) // change the cursorID field in the RawOp.Body 247 tempEnd := recordedOp.SrcEndpoint 248 recordedOp.SrcEndpoint = recordedOp.DstEndpoint 249 recordedOp.DstEndpoint = tempEnd 250 generator.pushDriverRequestOps(recordedOp) 251 return nil 252} 253 254func (generator *recordedOpGenerator) generateMsgOpInsertHelper(name string, startFrom, numInserts int) error { 255 for i := 0; i < numInserts; i++ { 256 doc := testDoc{ 257 Name: name, 258 DocumentNumber: i + startFrom, 259 Success: true, 260 } 261 err := generator.generateMsgOpAgainstCollection("insert", "documents", []interface{}{doc}, 0) 262 if err != nil { 263 return err 264 } 265 } 266 return nil 267} 268 269func (generator *recordedOpGenerator) generateMsgOpGetMore(cursorID int64, limit int32) error { 270 getmoreArgs := getmoreArgsHelper(cursorID, limit) 271 getmoreArgs = append(getmoreArgs, bson.DocElem{"$db", testDB}) 272 section := mgo.MsgSection{ 273 PayloadType: mgo.MsgPayload0, 274 Data: getmoreArgs, 275 } 276 return generator.generateMsgOp([]mgo.MsgSection{section}, 0) 277} 278 279func (generator *recordedOpGenerator) generateMsgOpFind(filter interface{}, limit, requestID int32) error { 280 findArgs := findArgsHelper(filter, limit) 281 findArgs = append(findArgs, bson.DocElem{"$db", testDB}) 282 section := mgo.MsgSection{ 283 PayloadType: mgo.MsgPayload0, 284 Data: findArgs, 285 } 286 return generator.generateMsgOp([]mgo.MsgSection{section}, requestID) 287} 288 289func (generator *recordedOpGenerator) generateMsgOpReply(responseTo int32, cursorID int64) error { 290 commandReply := commandReplyArgsHelper(cursorID) 291 commandReplyAsSlice, err := bson.Marshal(commandReply) 292 if err != nil { 293 return err 294 } 295 commandReplyAsRaw := &bson.Raw{} 296 297 err = bson.Unmarshal(commandReplyAsSlice, commandReplyAsRaw) 298 if err != nil { 299 return err 300 } 301 302 replySection := mgo.MsgSection{ 303 PayloadType: mgo.MsgPayload0, 304 Data: commandReplyAsRaw, 305 } 306 307 msgOpReply := mgo.MsgOp{ 308 Sections: []mgo.MsgSection{replySection}, 309 } 310 311 recordedOp, err := generator.fetchRecordedOpsFromConn(&msgOpReply) 312 if err != nil { 313 return err 314 } 315 316 recordedOp.RawOp.Header.ResponseTo = responseTo 317 tempEnd := recordedOp.SrcEndpoint 318 recordedOp.SrcEndpoint = recordedOp.DstEndpoint 319 recordedOp.DstEndpoint = tempEnd 320 generator.pushDriverRequestOps(recordedOp) 321 return nil 322} 323 324func (generator *recordedOpGenerator) generateMsgOpAgainstCollection(opName, identifier string, docs []interface{}, requestID int32) error { 325 section0 := mgo.MsgSection{ 326 PayloadType: mgo.MsgPayload0, 327 Data: bson.D{{opName, testCollection}, {"$db", testDB}}, 328 } 329 p1 := mgo.PayloadType1{ 330 Identifier: identifier, 331 Docs: docs, 332 } 333 p1Size, err := p1.CalculateSize() 334 if err != nil { 335 return err 336 } 337 p1.Size = p1Size 338 section1 := mgo.MsgSection{ 339 PayloadType: mgo.MsgPayload1, 340 Data: p1, 341 } 342 return generator.generateMsgOp([]mgo.MsgSection{section0, section1}, requestID) 343} 344 345func (generator *recordedOpGenerator) generateMsgOp(sections []mgo.MsgSection, requestID int32) error { 346 opMsg := mgo.MsgOp{ 347 Sections: sections, 348 } 349 recordedOp, err := generator.fetchRecordedOpsFromConn(&opMsg) 350 if err != nil { 351 return err 352 } 353 recordedOp.RawOp.Header.RequestID = requestID 354 generator.pushDriverRequestOps(recordedOp) 355 return nil 356} 357 358// generateKillCursorsOp creates a RecordedOp killCursors using the given 359// cursorIDs and pushes it to the recordedOpGenerator's channel to be executed 360// when Play() is called 361func (generator *recordedOpGenerator) generateKillCursors(cursorIDs []int64) error { 362 killCursors := mgo.KillCursorsOp{ 363 CursorIds: cursorIDs, 364 } 365 366 recordedOp, err := generator.fetchRecordedOpsFromConn(&killCursors) 367 if err != nil { 368 return err 369 } 370 generator.pushDriverRequestOps(recordedOp) 371 return nil 372} 373 374// fetchRecordedOpsFromConn runs the created mgo op through mgo and fetches its 375// result from the stubbed connection. In the case that a connection has not 376// been used before it reads two ops from the connection, the first being the 377// 'getNonce' request generated by the driver 378func (generator *recordedOpGenerator) fetchRecordedOpsFromConn(op interface{}) (*RecordedOp, error) { 379 socket, err := generator.session.AcquireSocketPrivate(true) 380 if err != nil { 381 return nil, fmt.Errorf("AcquireSocketPrivate: %v\n", err) 382 } 383 err = socket.Query(op) 384 if err != nil { 385 return nil, fmt.Errorf("Socket.Query: %v\n", err) 386 } 387 msg, err := ReadHeader(generator.serverConnection) 388 if err != nil { 389 return nil, fmt.Errorf("ReadHeader Error: %v\n", err) 390 } 391 result := RawOp{Header: *msg} 392 result.Body = make([]byte, MsgHeaderLen) 393 result.FromReader(generator.serverConnection) 394 395 recordedOp := &RecordedOp{RawOp: result, Seen: &PreciseTime{testTime}, SrcEndpoint: "a", DstEndpoint: "b", PlayedAt: &PreciseTime{}} 396 397 testTime = testTime.Add(time.Millisecond * 2) 398 return recordedOp, nil 399} 400