1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongoreplay
8
9import (
10	"fmt"
11	"io"
12	"strings"
13	"time"
14
15	"github.com/mongodb/mongo-tools/legacy/lldb"
16	"github.com/mongodb/mongo-tools/legacy/options"
17)
18
19// PlayCommand stores settings for the mongoreplay 'play' subcommand
20type PlayCommand struct {
21	GlobalOpts *Options `no-flag:"true"`
22	StatOptions
23	PlaybackFile string       `description:"path to the playback file to play from" short:"p" long:"playback-file" required:"yes"`
24	Speed        float64      `description:"multiplier for playback speed (1.0 = real-time, .5 = half-speed, 3.0 = triple-speed, etc.)" long:"speed" default:"1.0"`
25	URL          string       `short:"h" long:"host" env:"MONGOREPLAY_HOST" description:"Location of the host to play back against" default:"mongodb://localhost:27017"`
26	Repeat       int          `long:"repeat" description:"Number of times to play the playback file" default:"1"`
27	QueueTime    int          `long:"queueTime" description:"don't queue ops much further in the future than this number of seconds" default:"15"`
28	NoPreprocess bool         `long:"no-preprocess" description:"don't preprocess the input file to premap data such as mongo cursorIDs"`
29	Gzip         bool         `long:"gzip" description:"decompress gzipped input"`
30	Collect      string       `long:"collect" description:"Stat collection format; 'format' option uses the --format string" choice:"json" choice:"format" choice:"none" default:"none"`
31	FullSpeed    bool         `long:"fullSpeed" description:"run the playback as fast as possible"`
32	SSLOpts      *options.SSL `no-flag:"true"`
33}
34
35const queueGranularity = 1000
36
37// ValidateParams validates the settings described in the PlayCommand struct.
38func (play *PlayCommand) ValidateParams(args []string) error {
39	switch {
40	case len(args) > 0:
41		return fmt.Errorf("unknown argument: %s", args[0])
42	case play.Speed <= 0:
43		return fmt.Errorf("Invalid setting for --speed: '%v'", play.Speed)
44	case play.Repeat < 1:
45		return fmt.Errorf("Invalid setting for --repeat: '%v', value must be >=1", play.Repeat)
46	}
47	return nil
48}
49
50// Execute runs the program for the 'play' subcommand
51func (play *PlayCommand) Execute(args []string) error {
52	err := play.ValidateParams(args)
53	if err != nil {
54		return err
55	}
56	play.GlobalOpts.SetLogging()
57
58	statColl, err := newStatCollector(play.StatOptions, play.Collect, true, true)
59	if err != nil {
60		return err
61	}
62
63	if play.FullSpeed {
64		userInfoLogger.Logvf(Always, "Doing playback at full speed")
65	} else {
66		userInfoLogger.Logvf(Always, "Doing playback at %.2fx speed", play.Speed)
67	}
68
69	playbackFileReader, err := NewPlaybackFileReader(play.PlaybackFile, play.Gzip)
70	if err != nil {
71		return err
72	}
73
74	// Reparse given host via ToolOptions so we can use a SessionProvider
75	// for the llmgo session.
76	toolOpts := options.New("", "", options.EnabledOptions{Connection: true, URI: true, Auth: true})
77	// SSL options must be non-nil before parsing to enable parsing ssl;
78	// play.SSLopts will be nil if SSL is not enabled
79	toolOpts.SSL = play.SSLOpts
80	if !(strings.HasPrefix(play.URL, "mongodb://") || strings.HasPrefix(play.URL, "mongodb+srv://")) {
81		play.URL = fmt.Sprintf("mongodb://%s", play.URL)
82	}
83	_, err = toolOpts.ParseArgs([]string{"--uri", play.URL})
84
85	if err != nil {
86		return err
87	}
88
89	sp, err := lldb.NewSessionProvider(*toolOpts)
90	if err != nil {
91		return err
92	}
93
94	userInfoLogger.Logv(DebugLow, "Initializing a session")
95	session, err := sp.GetSession()
96	if err != nil {
97		return err
98	}
99	session.SetSocketTimeout(0)
100
101	context := NewExecutionContext(statColl, session, &ExecutionOptions{fullSpeed: play.FullSpeed,
102		driverOpsFiltered: playbackFileReader.metadata.DriverOpsFiltered})
103
104	session.SetPoolLimit(-1)
105
106	var opChan <-chan *RecordedOp
107	var errChan <-chan error
108
109	if !play.NoPreprocess {
110		opChan, errChan = playbackFileReader.OpChan(1)
111
112		preprocessMap, err := newPreprocessCursorManager(opChan)
113
114		if err != nil {
115			return fmt.Errorf("PreprocessMap: %v", err)
116		}
117
118		err = <-errChan
119		if err != io.EOF {
120			return fmt.Errorf("OpChan: %v", err)
121		}
122
123		_, err = playbackFileReader.Seek(0, 0)
124		if err != nil {
125			return err
126		}
127		context.CursorIDMap = preprocessMap
128	}
129
130	opChan, errChan = playbackFileReader.OpChan(play.Repeat)
131
132	if err := Play(context, opChan, play.Speed, play.Repeat, play.QueueTime); err != nil {
133		userInfoLogger.Logvf(Always, "Play: %v\n", err)
134	}
135
136	//handle the error from the errchan
137	err = <-errChan
138	if err != nil && err != io.EOF {
139		userInfoLogger.Logvf(Always, "OpChan: %v", err)
140	}
141	return nil
142}
143
144// Play is responsible for playing ops from a RecordedOp channel to the session.
145func Play(context *ExecutionContext,
146	opChan <-chan *RecordedOp,
147	speed float64,
148	repeat int,
149	queueTime int) error {
150
151	connectionChans := make(map[int64]chan<- *RecordedOp)
152	var playbackStartTime, recordingStartTime time.Time
153	var connectionID int64
154	var opCounter int
155	for op := range opChan {
156		opCounter++
157		if op.Seen.IsZero() {
158			return fmt.Errorf("Can't play operation found with zero-timestamp: %#v", op)
159		}
160		if recordingStartTime.IsZero() {
161			recordingStartTime = op.Seen.Time
162			playbackStartTime = time.Now()
163		}
164
165		// opDelta is the difference in time between when the file's recording
166		// began and and when this particular op is played. For the first
167		// operation in the playback, it's 0.
168		opDelta := op.Seen.Sub(recordingStartTime)
169
170		// Adjust the opDelta for playback by dividing it by playback speed setting;
171		// e.g. 2x speed means the delta is half as long.
172		scaledDelta := float64(opDelta) / (speed)
173		op.PlayAt = &PreciseTime{playbackStartTime.Add(time.Duration(int64(scaledDelta)))}
174
175		// Every queueGranularity ops make sure that we're no more then
176		// QueueTime seconds ahead Which should mean that the maximum that we're
177		// ever ahead is QueueTime seconds of ops + queueGranularity more ops.
178		// This is so that when we're at QueueTime ahead in the playback file we
179		// don't sleep after every read, and generally read and queue
180		// queueGranularity number of ops at a time and then sleep until the
181		// last read op is QueueTime ahead.
182		if !context.fullSpeed {
183			if opCounter%queueGranularity == 0 {
184				toolDebugLogger.Logvf(DebugHigh, "Waiting to prevent excess buffering with opCounter: %v", opCounter)
185				time.Sleep(op.PlayAt.Add(time.Duration(-queueTime) * time.Second).Sub(time.Now()))
186			}
187		}
188
189		connectionChan, ok := connectionChans[op.SeenConnectionNum]
190		if !ok {
191			connectionID++
192			connectionChan = context.newExecutionConnection(op.PlayAt.Time, connectionID)
193			connectionChans[op.SeenConnectionNum] = connectionChan
194		}
195		if op.EOF {
196			userInfoLogger.Logv(DebugLow, "EOF Seen in playback")
197			close(connectionChan)
198			delete(connectionChans, op.SeenConnectionNum)
199		} else {
200			connectionChan <- op
201		}
202	}
203	for connectionNum, connectionChan := range connectionChans {
204		close(connectionChan)
205		delete(connectionChans, connectionNum)
206	}
207	toolDebugLogger.Logvf(Info, "Waiting for connections to finish")
208	context.ConnectionChansWaitGroup.Wait()
209
210	context.StatCollector.Close()
211	toolDebugLogger.Logvf(Always, "%v ops played back in %v seconds over %v connections", opCounter, time.Now().Sub(playbackStartTime), connectionID)
212	if repeat > 1 {
213		toolDebugLogger.Logvf(Always, "%v ops per generation for %v generations", opCounter/repeat, repeat)
214	}
215	return nil
216}
217