1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package command
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"github.com/bgentry/speakeasy"
22	"strings"
23	"sync/atomic"
24	"time"
25
26	"go.etcd.io/etcd/api/v3/mvccpb"
27	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
28	"go.etcd.io/etcd/client/v3"
29	"go.etcd.io/etcd/client/v3/mirror"
30
31	"github.com/spf13/cobra"
32)
33
34var (
35	mminsecureTr   bool
36	mmcert         string
37	mmkey          string
38	mmcacert       string
39	mmprefix       string
40	mmdestprefix   string
41	mmuser         string
42	mmpassword     string
43	mmnodestprefix bool
44)
45
46// NewMakeMirrorCommand returns the cobra command for "makeMirror".
47func NewMakeMirrorCommand() *cobra.Command {
48	c := &cobra.Command{
49		Use:   "make-mirror [options] <destination>",
50		Short: "Makes a mirror at the destination etcd cluster",
51		Run:   makeMirrorCommandFunc,
52	}
53
54	c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror")
55	c.Flags().StringVar(&mmdestprefix, "dest-prefix", "", "destination prefix to mirror a prefix to a different prefix in the destination cluster")
56	c.Flags().BoolVar(&mmnodestprefix, "no-dest-prefix", false, "mirror key-values to the root of the destination cluster")
57	c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster")
58	c.Flags().StringVar(&mmkey, "dest-key", "", "Identify secure client using this TLS key file")
59	c.Flags().StringVar(&mmcacert, "dest-cacert", "", "Verify certificates of TLS enabled secure servers using this CA bundle")
60	// TODO: secure by default when etcd enables secure gRPC by default.
61	c.Flags().BoolVar(&mminsecureTr, "dest-insecure-transport", true, "Disable transport security for client connections")
62	c.Flags().StringVar(&mmuser, "dest-user", "", "Destination username[:password] for authentication (prompt if password is not supplied)")
63	c.Flags().StringVar(&mmpassword, "dest-password", "", "Destination password for authentication (if this option is used, --user option shouldn't include password)")
64
65	return c
66}
67
68func authDestCfg() *authCfg {
69	if mmuser == "" {
70		return nil
71	}
72
73	var cfg authCfg
74
75	if mmpassword == "" {
76		splitted := strings.SplitN(mmuser, ":", 2)
77		if len(splitted) < 2 {
78			var err error
79			cfg.username = mmuser
80			cfg.password, err = speakeasy.Ask("Destination Password: ")
81			if err != nil {
82				ExitWithError(ExitError, err)
83			}
84		} else {
85			cfg.username = splitted[0]
86			cfg.password = splitted[1]
87		}
88	} else {
89		cfg.username = mmuser
90		cfg.password = mmpassword
91	}
92
93	return &cfg
94}
95
96func makeMirrorCommandFunc(cmd *cobra.Command, args []string) {
97	if len(args) != 1 {
98		ExitWithError(ExitBadArgs, errors.New("make-mirror takes one destination argument"))
99	}
100
101	dialTimeout := dialTimeoutFromCmd(cmd)
102	keepAliveTime := keepAliveTimeFromCmd(cmd)
103	keepAliveTimeout := keepAliveTimeoutFromCmd(cmd)
104	sec := &secureCfg{
105		cert:              mmcert,
106		key:               mmkey,
107		cacert:            mmcacert,
108		insecureTransport: mminsecureTr,
109	}
110
111	auth := authDestCfg()
112
113	cc := &clientConfig{
114		endpoints:        []string{args[0]},
115		dialTimeout:      dialTimeout,
116		keepAliveTime:    keepAliveTime,
117		keepAliveTimeout: keepAliveTimeout,
118		scfg:             sec,
119		acfg:             auth,
120	}
121	dc := cc.mustClient()
122	c := mustClientFromCmd(cmd)
123
124	err := makeMirror(context.TODO(), c, dc)
125	ExitWithError(ExitError, err)
126}
127
128func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) error {
129	total := int64(0)
130
131	go func() {
132		for {
133			time.Sleep(30 * time.Second)
134			fmt.Println(atomic.LoadInt64(&total))
135		}
136	}()
137
138	s := mirror.NewSyncer(c, mmprefix, 0)
139
140	rc, errc := s.SyncBase(ctx)
141
142	// if destination prefix is specified and remove destination prefix is true return error
143	if mmnodestprefix && len(mmdestprefix) > 0 {
144		ExitWithError(ExitBadArgs, fmt.Errorf("`--dest-prefix` and `--no-dest-prefix` cannot be set at the same time, choose one"))
145	}
146
147	// if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix
148	if !mmnodestprefix && len(mmdestprefix) == 0 {
149		mmdestprefix = mmprefix
150	}
151
152	for r := range rc {
153		for _, kv := range r.Kvs {
154			_, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value))
155			if err != nil {
156				return err
157			}
158			atomic.AddInt64(&total, 1)
159		}
160	}
161
162	err := <-errc
163	if err != nil {
164		return err
165	}
166
167	wc := s.SyncUpdates(ctx)
168
169	for wr := range wc {
170		if wr.CompactRevision != 0 {
171			return rpctypes.ErrCompacted
172		}
173
174		var lastRev int64
175		ops := []clientv3.Op{}
176
177		for _, ev := range wr.Events {
178			nextRev := ev.Kv.ModRevision
179			if lastRev != 0 && nextRev > lastRev {
180				_, err := dc.Txn(ctx).Then(ops...).Commit()
181				if err != nil {
182					return err
183				}
184				ops = []clientv3.Op{}
185			}
186			lastRev = nextRev
187			switch ev.Type {
188			case mvccpb.PUT:
189				ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))
190				atomic.AddInt64(&total, 1)
191			case mvccpb.DELETE:
192				ops = append(ops, clientv3.OpDelete(modifyPrefix(string(ev.Kv.Key))))
193				atomic.AddInt64(&total, 1)
194			default:
195				panic("unexpected event type")
196			}
197		}
198
199		if len(ops) != 0 {
200			_, err := dc.Txn(ctx).Then(ops...).Commit()
201			if err != nil {
202				return err
203			}
204		}
205	}
206
207	return nil
208}
209
210func modifyPrefix(key string) string {
211	return strings.Replace(key, mmprefix, mmdestprefix, 1)
212}
213