1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongoreplay 8 9import ( 10 "fmt" 11 "io" 12 "strings" 13 "time" 14 15 "github.com/mongodb/mongo-tools/legacy/lldb" 16 "github.com/mongodb/mongo-tools/legacy/options" 17) 18 19// PlayCommand stores settings for the mongoreplay 'play' subcommand 20type PlayCommand struct { 21 GlobalOpts *Options `no-flag:"true"` 22 StatOptions 23 PlaybackFile string `description:"path to the playback file to play from" short:"p" long:"playback-file" required:"yes"` 24 Speed float64 `description:"multiplier for playback speed (1.0 = real-time, .5 = half-speed, 3.0 = triple-speed, etc.)" long:"speed" default:"1.0"` 25 URL string `short:"h" long:"host" env:"MONGOREPLAY_HOST" description:"Location of the host to play back against" default:"mongodb://localhost:27017"` 26 Repeat int `long:"repeat" description:"Number of times to play the playback file" default:"1"` 27 QueueTime int `long:"queueTime" description:"don't queue ops much further in the future than this number of seconds" default:"15"` 28 NoPreprocess bool `long:"no-preprocess" description:"don't preprocess the input file to premap data such as mongo cursorIDs"` 29 Gzip bool `long:"gzip" description:"decompress gzipped input"` 30 Collect string `long:"collect" description:"Stat collection format; 'format' option uses the --format string" choice:"json" choice:"format" choice:"none" default:"none"` 31 FullSpeed bool `long:"fullSpeed" description:"run the playback as fast as possible"` 32 SSLOpts *options.SSL `no-flag:"true"` 33} 34 35const queueGranularity = 1000 36 37// ValidateParams validates the settings described in the PlayCommand struct. 38func (play *PlayCommand) ValidateParams(args []string) error { 39 switch { 40 case len(args) > 0: 41 return fmt.Errorf("unknown argument: %s", args[0]) 42 case play.Speed <= 0: 43 return fmt.Errorf("Invalid setting for --speed: '%v'", play.Speed) 44 case play.Repeat < 1: 45 return fmt.Errorf("Invalid setting for --repeat: '%v', value must be >=1", play.Repeat) 46 } 47 return nil 48} 49 50// Execute runs the program for the 'play' subcommand 51func (play *PlayCommand) Execute(args []string) error { 52 err := play.ValidateParams(args) 53 if err != nil { 54 return err 55 } 56 play.GlobalOpts.SetLogging() 57 58 statColl, err := newStatCollector(play.StatOptions, play.Collect, true, true) 59 if err != nil { 60 return err 61 } 62 63 if play.FullSpeed { 64 userInfoLogger.Logvf(Always, "Doing playback at full speed") 65 } else { 66 userInfoLogger.Logvf(Always, "Doing playback at %.2fx speed", play.Speed) 67 } 68 69 playbackFileReader, err := NewPlaybackFileReader(play.PlaybackFile, play.Gzip) 70 if err != nil { 71 return err 72 } 73 74 // Reparse given host via ToolOptions so we can use a SessionProvider 75 // for the llmgo session. 76 toolOpts := options.New("", "", options.EnabledOptions{Connection: true, URI: true, Auth: true}) 77 // SSL options must be non-nil before parsing to enable parsing ssl; 78 // play.SSLopts will be nil if SSL is not enabled 79 toolOpts.SSL = play.SSLOpts 80 if !(strings.HasPrefix(play.URL, "mongodb://") || strings.HasPrefix(play.URL, "mongodb+srv://")) { 81 play.URL = fmt.Sprintf("mongodb://%s", play.URL) 82 } 83 _, err = toolOpts.ParseArgs([]string{"--uri", play.URL}) 84 85 if err != nil { 86 return err 87 } 88 89 sp, err := lldb.NewSessionProvider(*toolOpts) 90 if err != nil { 91 return err 92 } 93 94 userInfoLogger.Logv(DebugLow, "Initializing a session") 95 session, err := sp.GetSession() 96 if err != nil { 97 return err 98 } 99 session.SetSocketTimeout(0) 100 101 context := NewExecutionContext(statColl, session, &ExecutionOptions{fullSpeed: play.FullSpeed, 102 driverOpsFiltered: playbackFileReader.metadata.DriverOpsFiltered}) 103 104 session.SetPoolLimit(-1) 105 106 var opChan <-chan *RecordedOp 107 var errChan <-chan error 108 109 if !play.NoPreprocess { 110 opChan, errChan = playbackFileReader.OpChan(1) 111 112 preprocessMap, err := newPreprocessCursorManager(opChan) 113 114 if err != nil { 115 return fmt.Errorf("PreprocessMap: %v", err) 116 } 117 118 err = <-errChan 119 if err != io.EOF { 120 return fmt.Errorf("OpChan: %v", err) 121 } 122 123 _, err = playbackFileReader.Seek(0, 0) 124 if err != nil { 125 return err 126 } 127 context.CursorIDMap = preprocessMap 128 } 129 130 opChan, errChan = playbackFileReader.OpChan(play.Repeat) 131 132 if err := Play(context, opChan, play.Speed, play.Repeat, play.QueueTime); err != nil { 133 userInfoLogger.Logvf(Always, "Play: %v\n", err) 134 } 135 136 //handle the error from the errchan 137 err = <-errChan 138 if err != nil && err != io.EOF { 139 userInfoLogger.Logvf(Always, "OpChan: %v", err) 140 } 141 return nil 142} 143 144// Play is responsible for playing ops from a RecordedOp channel to the session. 145func Play(context *ExecutionContext, 146 opChan <-chan *RecordedOp, 147 speed float64, 148 repeat int, 149 queueTime int) error { 150 151 connectionChans := make(map[int64]chan<- *RecordedOp) 152 var playbackStartTime, recordingStartTime time.Time 153 var connectionID int64 154 var opCounter int 155 for op := range opChan { 156 opCounter++ 157 if op.Seen.IsZero() { 158 return fmt.Errorf("Can't play operation found with zero-timestamp: %#v", op) 159 } 160 if recordingStartTime.IsZero() { 161 recordingStartTime = op.Seen.Time 162 playbackStartTime = time.Now() 163 } 164 165 // opDelta is the difference in time between when the file's recording 166 // began and and when this particular op is played. For the first 167 // operation in the playback, it's 0. 168 opDelta := op.Seen.Sub(recordingStartTime) 169 170 // Adjust the opDelta for playback by dividing it by playback speed setting; 171 // e.g. 2x speed means the delta is half as long. 172 scaledDelta := float64(opDelta) / (speed) 173 op.PlayAt = &PreciseTime{playbackStartTime.Add(time.Duration(int64(scaledDelta)))} 174 175 // Every queueGranularity ops make sure that we're no more then 176 // QueueTime seconds ahead Which should mean that the maximum that we're 177 // ever ahead is QueueTime seconds of ops + queueGranularity more ops. 178 // This is so that when we're at QueueTime ahead in the playback file we 179 // don't sleep after every read, and generally read and queue 180 // queueGranularity number of ops at a time and then sleep until the 181 // last read op is QueueTime ahead. 182 if !context.fullSpeed { 183 if opCounter%queueGranularity == 0 { 184 toolDebugLogger.Logvf(DebugHigh, "Waiting to prevent excess buffering with opCounter: %v", opCounter) 185 time.Sleep(op.PlayAt.Add(time.Duration(-queueTime) * time.Second).Sub(time.Now())) 186 } 187 } 188 189 connectionChan, ok := connectionChans[op.SeenConnectionNum] 190 if !ok { 191 connectionID++ 192 connectionChan = context.newExecutionConnection(op.PlayAt.Time, connectionID) 193 connectionChans[op.SeenConnectionNum] = connectionChan 194 } 195 if op.EOF { 196 userInfoLogger.Logv(DebugLow, "EOF Seen in playback") 197 close(connectionChan) 198 delete(connectionChans, op.SeenConnectionNum) 199 } else { 200 connectionChan <- op 201 } 202 } 203 for connectionNum, connectionChan := range connectionChans { 204 close(connectionChan) 205 delete(connectionChans, connectionNum) 206 } 207 toolDebugLogger.Logvf(Info, "Waiting for connections to finish") 208 context.ConnectionChansWaitGroup.Wait() 209 210 context.StatCollector.Close() 211 toolDebugLogger.Logvf(Always, "%v ops played back in %v seconds over %v connections", opCounter, time.Now().Sub(playbackStartTime), connectionID) 212 if repeat > 1 { 213 toolDebugLogger.Logvf(Always, "%v ops per generation for %v generations", opCounter/repeat, repeat) 214 } 215 return nil 216} 217