1package server
2
3import (
4	"bytes"
5	//	"fmt"
6	"io"
7	"strings"
8	"time"
9
10	"github.com/siddontang/go/sync2"
11	"github.com/siddontang/ledisdb/ledis"
12)
13
14type responseWriter interface {
15	writeError(error)
16	writeStatus(string)
17	writeInteger(int64)
18	writeBulk([]byte)
19	writeArray([]interface{})
20	writeSliceArray([][]byte)
21	writeFVPairArray([]ledis.FVPair)
22	writeScorePairArray([]ledis.ScorePair, bool)
23	writeBulkFrom(int64, io.Reader)
24	flush()
25}
26
27type syncAck struct {
28	id uint64
29	ch chan uint64
30}
31
32type client struct {
33	app *App
34	ldb *ledis.Ledis
35
36	db *ledis.DB
37
38	remoteAddr string
39	cmd        string
40	args       [][]byte
41
42	isAuthed bool
43
44	resp responseWriter
45
46	syncBuf bytes.Buffer
47
48	lastLogID sync2.AtomicUint64
49
50	// reqErr chan error
51
52	buf bytes.Buffer
53
54	slaveListeningAddr string
55}
56
57func newClient(app *App) *client {
58	c := new(client)
59
60	c.app = app
61	c.ldb = app.ldb
62	c.isAuthed = false
63	c.db, _ = app.ldb.Select(0) //use default db
64
65	return c
66}
67
68func (c *client) close() {
69
70}
71
72func (c *client) authEnabled() bool {
73	return len(c.app.cfg.AuthPassword) > 0 || c.app.cfg.AuthMethod != nil
74}
75
76func (c *client) perform() {
77	var err error
78
79	start := time.Now()
80
81	c.cmd = strings.ToLower(c.cmd)
82
83	if len(c.cmd) == 0 {
84		err = ErrEmptyCommand
85	} else if exeCmd, ok := regCmds[c.cmd]; !ok {
86		err = ErrNotFound
87	} else if c.authEnabled() && !c.isAuthed && c.cmd != "auth" {
88		err = ErrNotAuthenticated
89	} else {
90		err = exeCmd(c)
91	}
92
93	if c.app.access != nil {
94		duration := time.Since(start)
95
96		fullCmd := c.catGenericCommand()
97		cost := duration.Nanoseconds() / 1000000
98
99		truncateLen := len(fullCmd)
100		if truncateLen > 256 {
101			truncateLen = 256
102		}
103
104		c.app.access.Log(c.remoteAddr, cost, fullCmd[:truncateLen], err)
105	}
106
107	if err != nil {
108		c.resp.writeError(err)
109	}
110	c.resp.flush()
111	return
112}
113
114func (c *client) catGenericCommand() []byte {
115	buffer := c.buf
116	buffer.Reset()
117
118	buffer.Write([]byte(c.cmd))
119
120	for _, arg := range c.args {
121		buffer.WriteByte(' ')
122		buffer.Write(arg)
123	}
124
125	return buffer.Bytes()
126}
127
128func writeValue(w responseWriter, value interface{}) {
129	switch v := value.(type) {
130	case []interface{}:
131		w.writeArray(v)
132	case [][]byte:
133		w.writeSliceArray(v)
134	case []byte:
135		w.writeBulk(v)
136	case string:
137		w.writeStatus(v)
138	case nil:
139		w.writeBulk(nil)
140	case int64:
141		w.writeInteger(v)
142	default:
143		panic("invalid value type")
144	}
145}
146