1// Copyright (C) MongoDB, Inc. 2014-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package mongoreplay 8 9import ( 10 "fmt" 11 "io" 12 "time" 13 14 mgo "github.com/10gen/llmgo" 15) 16 17// PlayCommand stores settings for the mongoreplay 'play' subcommand 18type PlayCommand struct { 19 GlobalOpts *Options `no-flag:"true"` 20 StatOptions 21 PlaybackFile string `description:"path to the playback file to play from" short:"p" long:"playback-file" required:"yes"` 22 Speed float64 `description:"multiplier for playback speed (1.0 = real-time, .5 = half-speed, 3.0 = triple-speed, etc.)" long:"speed" default:"1.0"` 23 URL string `short:"h" long:"host" description:"Location of the host to play back against" default:"mongodb://localhost:27017"` 24 Repeat int `long:"repeat" description:"Number of times to play the playback file" default:"1"` 25 QueueTime int `long:"queueTime" description:"don't queue ops much further in the future than this number of seconds" default:"15"` 26 NoPreprocess bool `long:"no-preprocess" description:"don't preprocess the input file to premap data such as mongo cursorIDs"` 27 Gzip bool `long:"gzip" description:"decompress gzipped input"` 28 Collect string `long:"collect" description:"Stat collection format; 'format' option uses the --format string" choice:"json" choice:"format" choice:"none" default:"none"` 29 FullSpeed bool `long:"fullSpeed" description:"run the playback as fast as possible"` 30} 31 32const queueGranularity = 1000 33 34// ValidateParams validates the settings described in the PlayCommand struct. 35func (play *PlayCommand) ValidateParams(args []string) error { 36 switch { 37 case len(args) > 0: 38 return fmt.Errorf("unknown argument: %s", args[0]) 39 case play.Speed <= 0: 40 return fmt.Errorf("Invalid setting for --speed: '%v'", play.Speed) 41 case play.Repeat < 1: 42 return fmt.Errorf("Invalid setting for --repeat: '%v', value must be >=1", play.Repeat) 43 } 44 return nil 45} 46 47// Execute runs the program for the 'play' subcommand 48func (play *PlayCommand) Execute(args []string) error { 49 err := play.ValidateParams(args) 50 if err != nil { 51 return err 52 } 53 play.GlobalOpts.SetLogging() 54 55 statColl, err := newStatCollector(play.StatOptions, play.Collect, true, true) 56 if err != nil { 57 return err 58 } 59 60 if play.FullSpeed { 61 userInfoLogger.Logvf(Always, "Doing playback at full speed") 62 } else { 63 userInfoLogger.Logvf(Always, "Doing playback at %.2fx speed", play.Speed) 64 } 65 66 playbackFileReader, err := NewPlaybackFileReader(play.PlaybackFile, play.Gzip) 67 if err != nil { 68 return err 69 } 70 71 session, err := mgo.Dial(play.URL) 72 if err != nil { 73 return err 74 } 75 session.SetSocketTimeout(0) 76 77 context := NewExecutionContext(statColl, session, &ExecutionOptions{fullSpeed: play.FullSpeed, 78 driverOpsFiltered: playbackFileReader.metadata.DriverOpsFiltered}) 79 80 session.SetPoolLimit(-1) 81 82 var opChan <-chan *RecordedOp 83 var errChan <-chan error 84 85 if !play.NoPreprocess { 86 opChan, errChan = playbackFileReader.OpChan(1) 87 88 preprocessMap, err := newPreprocessCursorManager(opChan) 89 90 if err != nil { 91 return fmt.Errorf("PreprocessMap: %v", err) 92 } 93 94 err = <-errChan 95 if err != io.EOF { 96 return fmt.Errorf("OpChan: %v", err) 97 } 98 99 _, err = playbackFileReader.Seek(0, 0) 100 if err != nil { 101 return err 102 } 103 context.CursorIDMap = preprocessMap 104 } 105 106 opChan, errChan = playbackFileReader.OpChan(play.Repeat) 107 108 if err := Play(context, opChan, play.Speed, play.Repeat, play.QueueTime); err != nil { 109 userInfoLogger.Logvf(Always, "Play: %v\n", err) 110 } 111 112 //handle the error from the errchan 113 err = <-errChan 114 if err != nil && err != io.EOF { 115 userInfoLogger.Logvf(Always, "OpChan: %v", err) 116 } 117 return nil 118} 119 120// Play is responsible for playing ops from a RecordedOp channel to the session. 121func Play(context *ExecutionContext, 122 opChan <-chan *RecordedOp, 123 speed float64, 124 repeat int, 125 queueTime int) error { 126 127 connectionChans := make(map[int64]chan<- *RecordedOp) 128 var playbackStartTime, recordingStartTime time.Time 129 var connectionID int64 130 var opCounter int 131 for op := range opChan { 132 opCounter++ 133 if op.Seen.IsZero() { 134 return fmt.Errorf("Can't play operation found with zero-timestamp: %#v", op) 135 } 136 if recordingStartTime.IsZero() { 137 recordingStartTime = op.Seen.Time 138 playbackStartTime = time.Now() 139 } 140 141 // opDelta is the difference in time between when the file's recording 142 // began and and when this particular op is played. For the first 143 // operation in the playback, it's 0. 144 opDelta := op.Seen.Sub(recordingStartTime) 145 146 // Adjust the opDelta for playback by dividing it by playback speed setting; 147 // e.g. 2x speed means the delta is half as long. 148 scaledDelta := float64(opDelta) / (speed) 149 op.PlayAt = &PreciseTime{playbackStartTime.Add(time.Duration(int64(scaledDelta)))} 150 151 // Every queueGranularity ops make sure that we're no more then 152 // QueueTime seconds ahead Which should mean that the maximum that we're 153 // ever ahead is QueueTime seconds of ops + queueGranularity more ops. 154 // This is so that when we're at QueueTime ahead in the playback file we 155 // don't sleep after every read, and generally read and queue 156 // queueGranularity number of ops at a time and then sleep until the 157 // last read op is QueueTime ahead. 158 if !context.fullSpeed { 159 if opCounter%queueGranularity == 0 { 160 toolDebugLogger.Logvf(DebugHigh, "Waiting to prevent excess buffering with opCounter: %v", opCounter) 161 time.Sleep(op.PlayAt.Add(time.Duration(-queueTime) * time.Second).Sub(time.Now())) 162 } 163 } 164 165 connectionChan, ok := connectionChans[op.SeenConnectionNum] 166 if !ok { 167 connectionID++ 168 connectionChan = context.newExecutionConnection(op.PlayAt.Time, connectionID) 169 connectionChans[op.SeenConnectionNum] = connectionChan 170 } 171 if op.EOF { 172 userInfoLogger.Logv(DebugLow, "EOF Seen in playback") 173 close(connectionChan) 174 delete(connectionChans, op.SeenConnectionNum) 175 } else { 176 connectionChan <- op 177 } 178 } 179 for connectionNum, connectionChan := range connectionChans { 180 close(connectionChan) 181 delete(connectionChans, connectionNum) 182 } 183 toolDebugLogger.Logvf(Info, "Waiting for connections to finish") 184 context.ConnectionChansWaitGroup.Wait() 185 186 context.StatCollector.Close() 187 toolDebugLogger.Logvf(Always, "%v ops played back in %v seconds over %v connections", opCounter, time.Now().Sub(playbackStartTime), connectionID) 188 if repeat > 1 { 189 toolDebugLogger.Logvf(Always, "%v ops per generation for %v generations", opCounter/repeat, repeat) 190 } 191 return nil 192} 193