1package server 2 3import ( 4 "bytes" 5 // "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/siddontang/go/sync2" 11 "github.com/siddontang/ledisdb/ledis" 12) 13 14type responseWriter interface { 15 writeError(error) 16 writeStatus(string) 17 writeInteger(int64) 18 writeBulk([]byte) 19 writeArray([]interface{}) 20 writeSliceArray([][]byte) 21 writeFVPairArray([]ledis.FVPair) 22 writeScorePairArray([]ledis.ScorePair, bool) 23 writeBulkFrom(int64, io.Reader) 24 flush() 25} 26 27type syncAck struct { 28 id uint64 29 ch chan uint64 30} 31 32type client struct { 33 app *App 34 ldb *ledis.Ledis 35 36 db *ledis.DB 37 38 remoteAddr string 39 cmd string 40 args [][]byte 41 42 isAuthed bool 43 44 resp responseWriter 45 46 syncBuf bytes.Buffer 47 48 lastLogID sync2.AtomicUint64 49 50 // reqErr chan error 51 52 buf bytes.Buffer 53 54 slaveListeningAddr string 55} 56 57func newClient(app *App) *client { 58 c := new(client) 59 60 c.app = app 61 c.ldb = app.ldb 62 c.isAuthed = false 63 c.db, _ = app.ldb.Select(0) //use default db 64 65 return c 66} 67 68func (c *client) close() { 69 70} 71 72func (c *client) authEnabled() bool { 73 return len(c.app.cfg.AuthPassword) > 0 || c.app.cfg.AuthMethod != nil 74} 75 76func (c *client) perform() { 77 var err error 78 79 start := time.Now() 80 81 c.cmd = strings.ToLower(c.cmd) 82 83 if len(c.cmd) == 0 { 84 err = ErrEmptyCommand 85 } else if exeCmd, ok := regCmds[c.cmd]; !ok { 86 err = ErrNotFound 87 } else if c.authEnabled() && !c.isAuthed && c.cmd != "auth" { 88 err = ErrNotAuthenticated 89 } else { 90 err = exeCmd(c) 91 } 92 93 if c.app.access != nil { 94 duration := time.Since(start) 95 96 fullCmd := c.catGenericCommand() 97 cost := duration.Nanoseconds() / 1000000 98 99 truncateLen := len(fullCmd) 100 if truncateLen > 256 { 101 truncateLen = 256 102 } 103 104 c.app.access.Log(c.remoteAddr, cost, fullCmd[:truncateLen], err) 105 } 106 107 if err != nil { 108 c.resp.writeError(err) 109 } 110 c.resp.flush() 111 return 112} 113 114func (c *client) catGenericCommand() []byte { 115 buffer := c.buf 116 buffer.Reset() 117 118 buffer.Write([]byte(c.cmd)) 119 120 for _, arg := range c.args { 121 buffer.WriteByte(' ') 122 buffer.Write(arg) 123 } 124 125 return buffer.Bytes() 126} 127 128func writeValue(w responseWriter, value interface{}) { 129 switch v := value.(type) { 130 case []interface{}: 131 w.writeArray(v) 132 case [][]byte: 133 w.writeSliceArray(v) 134 case []byte: 135 w.writeBulk(v) 136 case string: 137 w.writeStatus(v) 138 case nil: 139 w.writeBulk(nil) 140 case int64: 141 w.writeInteger(v) 142 default: 143 panic("invalid value type") 144 } 145} 146