1package mongoreplay
2
3import (
4	"fmt"
5	"time"
6
7	mgo "github.com/10gen/llmgo"
8	"github.com/10gen/llmgo/bson"
9)
10
11// recordedOpGenerator maintains a pair of connection stubs and channel to allow
12// ops to be generated by the driver and passed to a channel
13type recordedOpGenerator struct {
14	session          *SessionStub
15	serverConnection *ConnStub
16	opChan           chan *RecordedOp
17}
18
19func newRecordedOpGenerator() *recordedOpGenerator {
20	session := SessionStub{}
21	var serverConnection ConnStub
22	serverConnection, session.connection = newTwoSidedConn()
23	opChan := make(chan *RecordedOp, 1000)
24
25	return &recordedOpGenerator{
26		session:          &session,
27		serverConnection: &serverConnection,
28		opChan:           opChan,
29	}
30}
31
32// pushDriverRequestOps takes the pair of ops that the driver generator (the
33// nonce and the main op) and places them in the channel
34func (generator *recordedOpGenerator) pushDriverRequestOps(recordedOp *RecordedOp) {
35	generator.opChan <- recordedOp
36}
37
38type driverRequestOps struct {
39	nonce *RecordedOp
40	op    *RecordedOp
41}
42
43func getmoreArgsHelper(cursorID int64, limit int32) bson.D {
44	var getmoreArgs bson.D
45	if limit > 0 {
46		getmoreArgs = bson.D{{"getMore", cursorID}, {"collection", testCollection}, {"batchSize", limit}}
47	} else {
48		getmoreArgs = bson.D{{"getMore", cursorID}, {"collection", testCollection}}
49	}
50	return getmoreArgs
51}
52
53func findArgsHelper(filter interface{}, limit int32) bson.D {
54	var findArgs bson.D
55	if limit > 0 {
56		findArgs = bson.D{{"find", testCollection}, {"filter", filter}, {"batchSize", limit}}
57	} else {
58		findArgs = bson.D{{"find", testCollection}, {"filter", filter}}
59	}
60	return findArgs
61}
62
63func commandReplyArgsHelper(cursorID int64) interface{} {
64	commandReply := &struct {
65		Cursor struct {
66			ID int64 `bson:"id"`
67		} `bson:"cursor"`
68	}{}
69	commandReply.Cursor.ID = cursorID
70	return commandReply
71}
72
73// generateInsert creates a RecordedOp insert using the given documents and
74// pushes it to the recordedOpGenerator's channel to be executed when Play() is
75// called
76func (generator *recordedOpGenerator) generateInsert(docs []interface{}) error {
77	insert := mgo.InsertOp{Collection: fmt.Sprintf("%s.%s", testDB, testCollection),
78		Documents: docs,
79		Flags:     0,
80	}
81	recordedOp, err := generator.fetchRecordedOpsFromConn(&insert)
82	if err != nil {
83		return err
84	}
85	generator.pushDriverRequestOps(recordedOp)
86	return nil
87
88}
89
90func (generator *recordedOpGenerator) generateGetLastError() error {
91	query := bson.D{{"getLastError", 1}}
92	getLastError := mgo.QueryOp{
93		Collection: "admin.$cmd",
94		Query:      query,
95		Limit:      -1,
96		Flags:      0,
97	}
98	recordedOp, err := generator.fetchRecordedOpsFromConn(&getLastError)
99	if err != nil {
100		return err
101	}
102	generator.pushDriverRequestOps(recordedOp)
103	return nil
104
105}
106
107func (generator *recordedOpGenerator) generateInsertHelper(name string, startFrom, numInserts int) error {
108	for i := 0; i < numInserts; i++ {
109		doc := testDoc{
110			Name:           name,
111			DocumentNumber: i + startFrom,
112			Success:        true,
113		}
114		err := generator.generateInsert([]interface{}{doc})
115		if err != nil {
116			return err
117		}
118	}
119	return nil
120}
121
122// generateQuery creates a RecordedOp query using the given selector, limit, and
123// requestID, and pushes it to the recordedOpGenerator's channel to be executed
124// when Play() is called
125func (generator *recordedOpGenerator) generateQuery(querySelection interface{}, limit int32, requestID int32) error {
126	query := mgo.QueryOp{
127		Flags:      0,
128		HasOptions: true,
129		Skip:       0,
130		Limit:      limit,
131		Selector:   bson.D{},
132		Query:      querySelection,
133		Collection: fmt.Sprintf("%s.%s", testDB, testCollection),
134		Options:    mgo.QueryWrapper{},
135	}
136
137	recordedOp, err := generator.fetchRecordedOpsFromConn(&query)
138	if err != nil {
139		return err
140	}
141	recordedOp.RawOp.Header.RequestID = requestID
142	generator.pushDriverRequestOps(recordedOp)
143	return nil
144}
145
146func (generator *recordedOpGenerator) generateCommandOpInsertHelper(name string, startFrom, numInserts int) error {
147	for i := 0; i < numInserts; i++ {
148		doc := testDoc{
149			Name:           name,
150			DocumentNumber: i + startFrom,
151			Success:        true,
152		}
153		commandArgs := bson.D{{"documents", []testDoc{doc}}, {"insert", testCollection}}
154		err := generator.generateCommandOp("insert", commandArgs, 0)
155		if err != nil {
156			return err
157		}
158	}
159	return nil
160}
161
162func (generator *recordedOpGenerator) generateCommandFind(filter interface{}, limit int32, requestID int32) error {
163	findArgs := findArgsHelper(filter, limit)
164	return generator.generateCommandOp("find", findArgs, requestID)
165}
166
167func (generator *recordedOpGenerator) generateCommandGetMore(cursorID int64, limit int32) error {
168	getmoreArgs := getmoreArgsHelper(cursorID, limit)
169	return generator.generateCommandOp("getMore", getmoreArgs, 0)
170}
171func (generator *recordedOpGenerator) generateCommandReply(responseTo int32, cursorID int64) error {
172	commandReply := commandReplyArgsHelper(cursorID)
173
174	reply := mgo.CommandReplyOp{
175		Metadata:     &struct{}{},
176		CommandReply: commandReply,
177		OutputDocs:   []interface{}{},
178	}
179
180	recordedOp, err := generator.fetchRecordedOpsFromConn(&reply)
181	if err != nil {
182		return err
183	}
184
185	recordedOp.RawOp.Header.ResponseTo = responseTo
186	tempEnd := recordedOp.SrcEndpoint
187	recordedOp.SrcEndpoint = recordedOp.DstEndpoint
188	recordedOp.DstEndpoint = tempEnd
189	generator.pushDriverRequestOps(recordedOp)
190	return nil
191}
192
193func (generator *recordedOpGenerator) generateCommandOp(commandName string, commandArgs bson.D, requestID int32) error {
194	command := mgo.CommandOp{
195		Database:    testDB,
196		CommandName: commandName,
197		Metadata:    bson.D{},
198		CommandArgs: commandArgs,
199		InputDocs:   make([]interface{}, 0),
200	}
201
202	recordedOp, err := generator.fetchRecordedOpsFromConn(&command)
203	if err != nil {
204		return err
205	}
206	recordedOp.RawOp.Header.RequestID = requestID
207	generator.pushDriverRequestOps(recordedOp)
208	return nil
209}
210
211// generateGetMore creates a RecordedOp getMore using the given cursorID and
212// limit and pushes it to the recordedOpGenerator's channel to be executed when
213// Play() is called
214func (generator *recordedOpGenerator) generateGetMore(cursorID int64, limit int32) error {
215	getMore := mgo.GetMoreOp{
216		Collection: fmt.Sprintf("%s.%s", testDB, testCollection),
217		CursorId:   cursorID,
218		Limit:      limit,
219	}
220
221	recordedOp, err := generator.fetchRecordedOpsFromConn(&getMore)
222	if err != nil {
223		return err
224	}
225	generator.pushDriverRequestOps(recordedOp)
226	return nil
227}
228
229// generateReply creates a RecordedOp reply using the given responseTo,
230// cursorID, and firstDOc, and pushes it to the recordedOpGenerator's channel to
231// be executed when Play() is called
232func (generator *recordedOpGenerator) generateReply(responseTo int32, cursorID int64) error {
233	reply := mgo.ReplyOp{
234		Flags:     0,
235		CursorId:  cursorID,
236		FirstDoc:  0,
237		ReplyDocs: 5,
238	}
239
240	recordedOp, err := generator.fetchRecordedOpsFromConn(&reply)
241	if err != nil {
242		return err
243	}
244
245	recordedOp.RawOp.Header.ResponseTo = responseTo
246	SetInt64(recordedOp.RawOp.Body, 4, cursorID) // change the cursorID field in the RawOp.Body
247	tempEnd := recordedOp.SrcEndpoint
248	recordedOp.SrcEndpoint = recordedOp.DstEndpoint
249	recordedOp.DstEndpoint = tempEnd
250	generator.pushDriverRequestOps(recordedOp)
251	return nil
252}
253
254func (generator *recordedOpGenerator) generateMsgOpInsertHelper(name string, startFrom, numInserts int) error {
255	for i := 0; i < numInserts; i++ {
256		doc := testDoc{
257			Name:           name,
258			DocumentNumber: i + startFrom,
259			Success:        true,
260		}
261		err := generator.generateMsgOpAgainstCollection("insert", "documents", []interface{}{doc}, 0)
262		if err != nil {
263			return err
264		}
265	}
266	return nil
267}
268
269func (generator *recordedOpGenerator) generateMsgOpGetMore(cursorID int64, limit int32) error {
270	getmoreArgs := getmoreArgsHelper(cursorID, limit)
271	getmoreArgs = append(getmoreArgs, bson.DocElem{"$db", testDB})
272	section := mgo.MsgSection{
273		PayloadType: mgo.MsgPayload0,
274		Data:        getmoreArgs,
275	}
276	return generator.generateMsgOp([]mgo.MsgSection{section}, 0)
277}
278
279func (generator *recordedOpGenerator) generateMsgOpFind(filter interface{}, limit, requestID int32) error {
280	findArgs := findArgsHelper(filter, limit)
281	findArgs = append(findArgs, bson.DocElem{"$db", testDB})
282	section := mgo.MsgSection{
283		PayloadType: mgo.MsgPayload0,
284		Data:        findArgs,
285	}
286	return generator.generateMsgOp([]mgo.MsgSection{section}, requestID)
287}
288
289func (generator *recordedOpGenerator) generateMsgOpReply(responseTo int32, cursorID int64) error {
290	commandReply := commandReplyArgsHelper(cursorID)
291	commandReplyAsSlice, err := bson.Marshal(commandReply)
292	if err != nil {
293		return err
294	}
295	commandReplyAsRaw := &bson.Raw{}
296
297	err = bson.Unmarshal(commandReplyAsSlice, commandReplyAsRaw)
298	if err != nil {
299		return err
300	}
301
302	replySection := mgo.MsgSection{
303		PayloadType: mgo.MsgPayload0,
304		Data:        commandReplyAsRaw,
305	}
306
307	msgOpReply := mgo.MsgOp{
308		Sections: []mgo.MsgSection{replySection},
309	}
310
311	recordedOp, err := generator.fetchRecordedOpsFromConn(&msgOpReply)
312	if err != nil {
313		return err
314	}
315
316	recordedOp.RawOp.Header.ResponseTo = responseTo
317	tempEnd := recordedOp.SrcEndpoint
318	recordedOp.SrcEndpoint = recordedOp.DstEndpoint
319	recordedOp.DstEndpoint = tempEnd
320	generator.pushDriverRequestOps(recordedOp)
321	return nil
322}
323
324func (generator *recordedOpGenerator) generateMsgOpAgainstCollection(opName, identifier string, docs []interface{}, requestID int32) error {
325	section0 := mgo.MsgSection{
326		PayloadType: mgo.MsgPayload0,
327		Data:        bson.D{{opName, testCollection}, {"$db", testDB}},
328	}
329	p1 := mgo.PayloadType1{
330		Identifier: identifier,
331		Docs:       docs,
332	}
333	p1Size, err := p1.CalculateSize()
334	if err != nil {
335		return err
336	}
337	p1.Size = p1Size
338	section1 := mgo.MsgSection{
339		PayloadType: mgo.MsgPayload1,
340		Data:        p1,
341	}
342	return generator.generateMsgOp([]mgo.MsgSection{section0, section1}, requestID)
343}
344
345func (generator *recordedOpGenerator) generateMsgOp(sections []mgo.MsgSection, requestID int32) error {
346	opMsg := mgo.MsgOp{
347		Sections: sections,
348	}
349	recordedOp, err := generator.fetchRecordedOpsFromConn(&opMsg)
350	if err != nil {
351		return err
352	}
353	recordedOp.RawOp.Header.RequestID = requestID
354	generator.pushDriverRequestOps(recordedOp)
355	return nil
356}
357
358// generateKillCursorsOp creates a RecordedOp killCursors using the given
359// cursorIDs and pushes it to the recordedOpGenerator's channel to be executed
360// when Play() is called
361func (generator *recordedOpGenerator) generateKillCursors(cursorIDs []int64) error {
362	killCursors := mgo.KillCursorsOp{
363		CursorIds: cursorIDs,
364	}
365
366	recordedOp, err := generator.fetchRecordedOpsFromConn(&killCursors)
367	if err != nil {
368		return err
369	}
370	generator.pushDriverRequestOps(recordedOp)
371	return nil
372}
373
374// fetchRecordedOpsFromConn runs the created mgo op through mgo and fetches its
375// result from the stubbed connection. In the case that a connection has not
376// been used before it reads two ops from the connection, the first being the
377// 'getNonce' request generated by the driver
378func (generator *recordedOpGenerator) fetchRecordedOpsFromConn(op interface{}) (*RecordedOp, error) {
379	socket, err := generator.session.AcquireSocketPrivate(true)
380	if err != nil {
381		return nil, fmt.Errorf("AcquireSocketPrivate: %v\n", err)
382	}
383	err = socket.Query(op)
384	if err != nil {
385		return nil, fmt.Errorf("Socket.Query: %v\n", err)
386	}
387	msg, err := ReadHeader(generator.serverConnection)
388	if err != nil {
389		return nil, fmt.Errorf("ReadHeader Error: %v\n", err)
390	}
391	result := RawOp{Header: *msg}
392	result.Body = make([]byte, MsgHeaderLen)
393	result.FromReader(generator.serverConnection)
394
395	recordedOp := &RecordedOp{RawOp: result, Seen: &PreciseTime{testTime}, SrcEndpoint: "a", DstEndpoint: "b", PlayedAt: &PreciseTime{}}
396
397	testTime = testTime.Add(time.Millisecond * 2)
398	return recordedOp, nil
399}
400