1package server
2
3import (
4	"encoding/binary"
5	"fmt"
6	"net"
7	"strconv"
8	"strings"
9	"time"
10
11	"github.com/siddontang/go/hack"
12	"github.com/siddontang/go/num"
13	"github.com/siddontang/ledisdb/ledis"
14)
15
16func slaveofCommand(c *client) error {
17	args := c.args
18
19	if len(args) != 2 && len(args) != 3 {
20		return ErrCmdParams
21	}
22
23	masterAddr := ""
24	restart := false
25	readonly := false
26
27	if strings.ToLower(hack.String(args[0])) == "no" &&
28		strings.ToLower(hack.String(args[1])) == "one" {
29		//stop replication, use master = ""
30		if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "readonly" {
31			readonly = true
32		}
33	} else {
34		if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil {
35			return err
36		}
37
38		masterAddr = fmt.Sprintf("%s:%s", args[0], args[1])
39
40		if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "restart" {
41			restart = true
42		}
43	}
44
45	if err := c.app.slaveof(masterAddr, restart, readonly); err != nil {
46		return err
47	}
48
49	c.resp.writeStatus(OK)
50
51	return nil
52}
53
54func fullsyncCommand(c *client) error {
55	args := c.args
56	needNew := false
57	if len(args) == 1 && strings.ToLower(hack.String(args[0])) == "new" {
58		needNew = true
59	}
60
61	var s *snapshot
62	var err error
63	var t time.Time
64
65	dumper := c.app.ldb
66
67	if needNew {
68		s, t, err = c.app.snap.Create(dumper)
69	} else {
70		if s, t, err = c.app.snap.OpenLatest(); err != nil {
71			return err
72		} else if s == nil {
73			s, t, err = c.app.snap.Create(dumper)
74		} else {
75			gap := time.Duration(c.app.cfg.Replication.ExpiredLogDays*24*3600) * time.Second / 2
76			minT := time.Now().Add(-gap)
77
78			//snapshot is too old
79			if t.Before(minT) {
80				s.Close()
81				s, t, err = c.app.snap.Create(dumper)
82			}
83		}
84	}
85
86	if err != nil {
87		return err
88	}
89
90	n := s.Size()
91
92	c.resp.writeBulkFrom(n, s)
93
94	s.Close()
95
96	return nil
97}
98
99var dummyBuf = make([]byte, 8)
100
101func syncCommand(c *client) error {
102	args := c.args
103	if len(args) != 1 {
104		return ErrCmdParams
105	}
106
107	var logId uint64
108	var err error
109
110	if logId, err = ledis.StrUint64(args[0], nil); err != nil {
111		return ErrCmdParams
112	}
113
114	lastLogID := logId - 1
115
116	stat, err := c.app.ldb.ReplicationStat()
117	if err != nil {
118		return err
119	}
120
121	if lastLogID > stat.LastID {
122		return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID)
123	}
124
125	c.lastLogID.Set(lastLogID)
126
127	if lastLogID == stat.LastID {
128		c.app.slaveAck(c)
129	}
130
131	c.syncBuf.Reset()
132
133	c.syncBuf.Write(dummyBuf)
134
135	if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 1, c.app.quit); err != nil {
136		return err
137	} else {
138		buf := c.syncBuf.Bytes()
139
140		stat, err = c.app.ldb.ReplicationStat()
141		if err != nil {
142			return err
143		}
144
145		binary.BigEndian.PutUint64(buf, stat.LastID)
146
147		c.resp.writeBulk(buf)
148	}
149
150	return nil
151}
152
153//inner command, only for replication
154//REPLCONF <option> <value> <option> <value> ...
155func replconfCommand(c *client) error {
156	args := c.args
157	if len(args)%2 != 0 {
158		return ErrCmdParams
159	}
160
161	if !c.app.ldb.ReplicationUsed() {
162		return ledis.ErrRplNotSupport
163	}
164
165	//now only support "listening-port"
166	for i := 0; i < len(args); i += 2 {
167		switch strings.ToLower(hack.String(args[i])) {
168		case "listening-port":
169			var host string
170			var err error
171			if _, err = num.ParseUint16(hack.String(args[i+1])); err != nil {
172				return err
173			}
174			if host, _, err = net.SplitHostPort(c.remoteAddr); err != nil {
175				return err
176			} else {
177				c.slaveListeningAddr = net.JoinHostPort(host, hack.String(args[i+1]))
178			}
179
180			c.app.addSlave(c)
181		default:
182			return ErrSyntax
183		}
184	}
185
186	c.resp.writeStatus(OK)
187	return nil
188}
189
190func roleCommand(c *client) error {
191	if len(c.args) != 0 {
192		return ErrCmdParams
193	}
194
195	c.app.m.Lock()
196	slaveof := c.app.cfg.SlaveOf
197	c.app.m.Unlock()
198
199	isMaster := len(slaveof) == 0
200
201	ay := make([]interface{}, 0, 5)
202
203	var lastId int64 = 0
204
205	stat, _ := c.app.ldb.ReplicationStat()
206	if stat != nil {
207		lastId = int64(stat.LastID)
208	}
209
210	if isMaster {
211		ay = append(ay, []byte("master"))
212		ay = append(ay, lastId)
213
214		items := make([]interface{}, 0, 3)
215
216		c.app.slock.Lock()
217		for addr, slave := range c.app.slaves {
218			host, port, _ := splitHostPort(addr)
219
220			items = append(items, []interface{}{[]byte(host),
221				strconv.AppendUint(nil, uint64(port), 10),
222				strconv.AppendUint(nil, slave.lastLogID.Get(), 10)})
223		}
224		c.app.slock.Unlock()
225		ay = append(ay, items)
226	} else {
227		host, port, _ := splitHostPort(slaveof)
228		ay = append(ay, []byte("slave"))
229		ay = append(ay, []byte(host))
230		ay = append(ay, int64(port))
231		ay = append(ay, []byte(replStatetring(c.app.m.state.Get())))
232		ay = append(ay, lastId)
233	}
234
235	c.resp.writeArray(ay)
236	return nil
237}
238
239func replStatetring(r int32) string {
240	switch r {
241	case replConnectState:
242		return "connect"
243	case replConnectingState:
244		return "connecting"
245	case replSyncState:
246		return "sync"
247	case replConnectedState:
248		return "connected"
249	default:
250		return "unknown"
251	}
252}
253
254func splitHostPort(str string) (string, int16, error) {
255	host, port, err := net.SplitHostPort(str)
256	if err != nil {
257		return "", 0, err
258	}
259
260	p, err := strconv.ParseInt(port, 10, 16)
261	if err != nil {
262		return "", 0, err
263	}
264
265	return host, int16(p), nil
266}
267
268func init() {
269	register("slaveof", slaveofCommand)
270	register("fullsync", fullsyncCommand)
271	register("sync", syncCommand)
272	register("replconf", replconfCommand)
273	register("role", roleCommand)
274}
275