1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongoreplay
8
9import (
10	"fmt"
11	"io"
12	"time"
13
14	mgo "github.com/10gen/llmgo"
15)
16
17// PlayCommand stores settings for the mongoreplay 'play' subcommand
18type PlayCommand struct {
19	GlobalOpts *Options `no-flag:"true"`
20	StatOptions
21	PlaybackFile string  `description:"path to the playback file to play from" short:"p" long:"playback-file" required:"yes"`
22	Speed        float64 `description:"multiplier for playback speed (1.0 = real-time, .5 = half-speed, 3.0 = triple-speed, etc.)" long:"speed" default:"1.0"`
23	URL          string  `short:"h" long:"host" description:"Location of the host to play back against" default:"mongodb://localhost:27017"`
24	Repeat       int     `long:"repeat" description:"Number of times to play the playback file" default:"1"`
25	QueueTime    int     `long:"queueTime" description:"don't queue ops much further in the future than this number of seconds" default:"15"`
26	NoPreprocess bool    `long:"no-preprocess" description:"don't preprocess the input file to premap data such as mongo cursorIDs"`
27	Gzip         bool    `long:"gzip" description:"decompress gzipped input"`
28	Collect      string  `long:"collect" description:"Stat collection format; 'format' option uses the --format string" choice:"json" choice:"format" choice:"none" default:"none"`
29	FullSpeed    bool    `long:"fullSpeed" description:"run the playback as fast as possible"`
30}
31
32const queueGranularity = 1000
33
34// ValidateParams validates the settings described in the PlayCommand struct.
35func (play *PlayCommand) ValidateParams(args []string) error {
36	switch {
37	case len(args) > 0:
38		return fmt.Errorf("unknown argument: %s", args[0])
39	case play.Speed <= 0:
40		return fmt.Errorf("Invalid setting for --speed: '%v'", play.Speed)
41	case play.Repeat < 1:
42		return fmt.Errorf("Invalid setting for --repeat: '%v', value must be >=1", play.Repeat)
43	}
44	return nil
45}
46
47// Execute runs the program for the 'play' subcommand
48func (play *PlayCommand) Execute(args []string) error {
49	err := play.ValidateParams(args)
50	if err != nil {
51		return err
52	}
53	play.GlobalOpts.SetLogging()
54
55	statColl, err := newStatCollector(play.StatOptions, play.Collect, true, true)
56	if err != nil {
57		return err
58	}
59
60	if play.FullSpeed {
61		userInfoLogger.Logvf(Always, "Doing playback at full speed")
62	} else {
63		userInfoLogger.Logvf(Always, "Doing playback at %.2fx speed", play.Speed)
64	}
65
66	playbackFileReader, err := NewPlaybackFileReader(play.PlaybackFile, play.Gzip)
67	if err != nil {
68		return err
69	}
70
71	session, err := mgo.Dial(play.URL)
72	if err != nil {
73		return err
74	}
75	session.SetSocketTimeout(0)
76
77	context := NewExecutionContext(statColl, session, &ExecutionOptions{fullSpeed: play.FullSpeed,
78		driverOpsFiltered: playbackFileReader.metadata.DriverOpsFiltered})
79
80	session.SetPoolLimit(-1)
81
82	var opChan <-chan *RecordedOp
83	var errChan <-chan error
84
85	if !play.NoPreprocess {
86		opChan, errChan = playbackFileReader.OpChan(1)
87
88		preprocessMap, err := newPreprocessCursorManager(opChan)
89
90		if err != nil {
91			return fmt.Errorf("PreprocessMap: %v", err)
92		}
93
94		err = <-errChan
95		if err != io.EOF {
96			return fmt.Errorf("OpChan: %v", err)
97		}
98
99		_, err = playbackFileReader.Seek(0, 0)
100		if err != nil {
101			return err
102		}
103		context.CursorIDMap = preprocessMap
104	}
105
106	opChan, errChan = playbackFileReader.OpChan(play.Repeat)
107
108	if err := Play(context, opChan, play.Speed, play.Repeat, play.QueueTime); err != nil {
109		userInfoLogger.Logvf(Always, "Play: %v\n", err)
110	}
111
112	//handle the error from the errchan
113	err = <-errChan
114	if err != nil && err != io.EOF {
115		userInfoLogger.Logvf(Always, "OpChan: %v", err)
116	}
117	return nil
118}
119
120// Play is responsible for playing ops from a RecordedOp channel to the session.
121func Play(context *ExecutionContext,
122	opChan <-chan *RecordedOp,
123	speed float64,
124	repeat int,
125	queueTime int) error {
126
127	connectionChans := make(map[int64]chan<- *RecordedOp)
128	var playbackStartTime, recordingStartTime time.Time
129	var connectionID int64
130	var opCounter int
131	for op := range opChan {
132		opCounter++
133		if op.Seen.IsZero() {
134			return fmt.Errorf("Can't play operation found with zero-timestamp: %#v", op)
135		}
136		if recordingStartTime.IsZero() {
137			recordingStartTime = op.Seen.Time
138			playbackStartTime = time.Now()
139		}
140
141		// opDelta is the difference in time between when the file's recording
142		// began and and when this particular op is played. For the first
143		// operation in the playback, it's 0.
144		opDelta := op.Seen.Sub(recordingStartTime)
145
146		// Adjust the opDelta for playback by dividing it by playback speed setting;
147		// e.g. 2x speed means the delta is half as long.
148		scaledDelta := float64(opDelta) / (speed)
149		op.PlayAt = &PreciseTime{playbackStartTime.Add(time.Duration(int64(scaledDelta)))}
150
151		// Every queueGranularity ops make sure that we're no more then
152		// QueueTime seconds ahead Which should mean that the maximum that we're
153		// ever ahead is QueueTime seconds of ops + queueGranularity more ops.
154		// This is so that when we're at QueueTime ahead in the playback file we
155		// don't sleep after every read, and generally read and queue
156		// queueGranularity number of ops at a time and then sleep until the
157		// last read op is QueueTime ahead.
158		if !context.fullSpeed {
159			if opCounter%queueGranularity == 0 {
160				toolDebugLogger.Logvf(DebugHigh, "Waiting to prevent excess buffering with opCounter: %v", opCounter)
161				time.Sleep(op.PlayAt.Add(time.Duration(-queueTime) * time.Second).Sub(time.Now()))
162			}
163		}
164
165		connectionChan, ok := connectionChans[op.SeenConnectionNum]
166		if !ok {
167			connectionID++
168			connectionChan = context.newExecutionConnection(op.PlayAt.Time, connectionID)
169			connectionChans[op.SeenConnectionNum] = connectionChan
170		}
171		if op.EOF {
172			userInfoLogger.Logv(DebugLow, "EOF Seen in playback")
173			close(connectionChan)
174			delete(connectionChans, op.SeenConnectionNum)
175		} else {
176			connectionChan <- op
177		}
178	}
179	for connectionNum, connectionChan := range connectionChans {
180		close(connectionChan)
181		delete(connectionChans, connectionNum)
182	}
183	toolDebugLogger.Logvf(Info, "Waiting for connections to finish")
184	context.ConnectionChansWaitGroup.Wait()
185
186	context.StatCollector.Close()
187	toolDebugLogger.Logvf(Always, "%v ops played back in %v seconds over %v connections", opCounter, time.Now().Sub(playbackStartTime), connectionID)
188	if repeat > 1 {
189		toolDebugLogger.Logvf(Always, "%v ops per generation for %v generations", opCounter/repeat, repeat)
190	}
191	return nil
192}
193