1package server 2 3import ( 4 "encoding/binary" 5 "fmt" 6 "net" 7 "strconv" 8 "strings" 9 "time" 10 11 "github.com/siddontang/go/hack" 12 "github.com/siddontang/go/num" 13 "github.com/siddontang/ledisdb/ledis" 14) 15 16func slaveofCommand(c *client) error { 17 args := c.args 18 19 if len(args) != 2 && len(args) != 3 { 20 return ErrCmdParams 21 } 22 23 masterAddr := "" 24 restart := false 25 readonly := false 26 27 if strings.ToLower(hack.String(args[0])) == "no" && 28 strings.ToLower(hack.String(args[1])) == "one" { 29 //stop replication, use master = "" 30 if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "readonly" { 31 readonly = true 32 } 33 } else { 34 if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil { 35 return err 36 } 37 38 masterAddr = fmt.Sprintf("%s:%s", args[0], args[1]) 39 40 if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "restart" { 41 restart = true 42 } 43 } 44 45 if err := c.app.slaveof(masterAddr, restart, readonly); err != nil { 46 return err 47 } 48 49 c.resp.writeStatus(OK) 50 51 return nil 52} 53 54func fullsyncCommand(c *client) error { 55 args := c.args 56 needNew := false 57 if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" { 58 needNew = true 59 } 60 61 var s *snapshot 62 var err error 63 var t time.Time 64 65 dumper := c.app.ldb 66 67 if needNew { 68 s, t, err = c.app.snap.Create(dumper) 69 } else { 70 if s, t, err = c.app.snap.OpenLatest(); err != nil { 71 return err 72 } else if s == nil { 73 s, t, err = c.app.snap.Create(dumper) 74 } else { 75 gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2 76 minT := time.Now().Add(-gap) 77 78 //snapshot is too old 79 if t.Before(minT) { 80 s.Close() 81 s, t, err = c.app.snap.Create(dumper) 82 } 83 } 84 } 85 86 if err != nil { 87 return err 88 } 89 90 n := s.Size() 91 92 c.resp.writeBulkFrom(n, s) 93 94 s.Close() 95 96 return nil 97} 98 99var dummyBuf = make([]byte, 8) 100 101func syncCommand(c *client) error { 102 args := c.args 103 if len(args) != 1 { 104 return ErrCmdParams 105 } 106 107 var logId uint64 108 var err error 109 110 if logId, err = ledis.StrUint64(args[0], nil); err != nil { 111 return ErrCmdParams 112 } 113 114 lastLogID := logId - 1 115 116 stat, err := c.app.ldb.ReplicationStat() 117 if err != nil { 118 return err 119 } 120 121 if lastLogID > stat.LastID { 122 return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID) 123 } 124 125 c.lastLogID.Set(lastLogID) 126 127 if lastLogID == stat.LastID { 128 c.app.slaveAck(c) 129 } 130 131 c.syncBuf.Reset() 132 133 c.syncBuf.Write(dummyBuf) 134 135 if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 1, c.app.quit); err != nil { 136 return err 137 } else { 138 buf := c.syncBuf.Bytes() 139 140 stat, err = c.app.ldb.ReplicationStat() 141 if err != nil { 142 return err 143 } 144 145 binary.BigEndian.PutUint64(buf, stat.LastID) 146 147 c.resp.writeBulk(buf) 148 } 149 150 return nil 151} 152 153//inner command, only for replication 154//REPLCONF <option> <value> <option> <value> ... 155func replconfCommand(c *client) error { 156 args := c.args 157 if len(args)%2 != 0 { 158 return ErrCmdParams 159 } 160 161 if !c.app.ldb.ReplicationUsed() { 162 return ledis.ErrRplNotSupport 163 } 164 165 //now only support "listening-port" 166 for i := 0; i < len(args); i += 2 { 167 switch strings.ToLower(hack.String(args[i])) { 168 case "listening-port": 169 var host string 170 var err error 171 if _, err = num.ParseUint16(hack.String(args[i+1])); err != nil { 172 return err 173 } 174 if host, _, err = net.SplitHostPort(c.remoteAddr); err != nil { 175 return err 176 } else { 177 c.slaveListeningAddr = net.JoinHostPort(host, hack.String(args[i+1])) 178 } 179 180 c.app.addSlave(c) 181 default: 182 return ErrSyntax 183 } 184 } 185 186 c.resp.writeStatus(OK) 187 return nil 188} 189 190func roleCommand(c *client) error { 191 if len(c.args) != 0 { 192 return ErrCmdParams 193 } 194 195 c.app.m.Lock() 196 slaveof := c.app.cfg.SlaveOf 197 c.app.m.Unlock() 198 199 isMaster := len(slaveof) == 0 200 201 ay := make([]interface{}, 0, 5) 202 203 var lastId int64 = 0 204 205 stat, _ := c.app.ldb.ReplicationStat() 206 if stat != nil { 207 lastId = int64(stat.LastID) 208 } 209 210 if isMaster { 211 ay = append(ay, []byte("master")) 212 ay = append(ay, lastId) 213 214 items := make([]interface{}, 0, 3) 215 216 c.app.slock.Lock() 217 for addr, slave := range c.app.slaves { 218 host, port, _ := splitHostPort(addr) 219 220 items = append(items, []interface{}{[]byte(host), 221 strconv.AppendUint(nil, uint64(port), 10), 222 strconv.AppendUint(nil, slave.lastLogID.Get(), 10)}) 223 } 224 c.app.slock.Unlock() 225 ay = append(ay, items) 226 } else { 227 host, port, _ := splitHostPort(slaveof) 228 ay = append(ay, []byte("slave")) 229 ay = append(ay, []byte(host)) 230 ay = append(ay, int64(port)) 231 ay = append(ay, []byte(replStatetring(c.app.m.state.Get()))) 232 ay = append(ay, lastId) 233 } 234 235 c.resp.writeArray(ay) 236 return nil 237} 238 239func replStatetring(r int32) string { 240 switch r { 241 case replConnectState: 242 return "connect" 243 case replConnectingState: 244 return "connecting" 245 case replSyncState: 246 return "sync" 247 case replConnectedState: 248 return "connected" 249 default: 250 return "unknown" 251 } 252} 253 254func splitHostPort(str string) (string, int16, error) { 255 host, port, err := net.SplitHostPort(str) 256 if err != nil { 257 return "", 0, err 258 } 259 260 p, err := strconv.ParseInt(port, 10, 16) 261 if err != nil { 262 return "", 0, err 263 } 264 265 return host, int16(p), nil 266} 267 268func init() { 269 register("slaveof", slaveofCommand) 270 register("fullsync", fullsyncCommand) 271 register("sync", syncCommand) 272 register("replconf", replconfCommand) 273 register("role", roleCommand) 274} 275