1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package command
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"strings"
22	"sync/atomic"
23	"time"
24
25	"go.etcd.io/etcd/clientv3"
26	"go.etcd.io/etcd/clientv3/mirror"
27	"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
28	"go.etcd.io/etcd/mvcc/mvccpb"
29
30	"github.com/spf13/cobra"
31)
32
33var (
34	mminsecureTr   bool
35	mmcert         string
36	mmkey          string
37	mmcacert       string
38	mmprefix       string
39	mmdestprefix   string
40	mmnodestprefix bool
41)
42
43// NewMakeMirrorCommand returns the cobra command for "makeMirror".
44func NewMakeMirrorCommand() *cobra.Command {
45	c := &cobra.Command{
46		Use:   "make-mirror [options] <destination>",
47		Short: "Makes a mirror at the destination etcd cluster",
48		Run:   makeMirrorCommandFunc,
49	}
50
51	c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror")
52	c.Flags().StringVar(&mmdestprefix, "dest-prefix", "", "destination prefix to mirror a prefix to a different prefix in the destination cluster")
53	c.Flags().BoolVar(&mmnodestprefix, "no-dest-prefix", false, "mirror key-values to the root of the destination cluster")
54	c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster")
55	c.Flags().StringVar(&mmkey, "dest-key", "", "Identify secure client using this TLS key file")
56	c.Flags().StringVar(&mmcacert, "dest-cacert", "", "Verify certificates of TLS enabled secure servers using this CA bundle")
57	// TODO: secure by default when etcd enables secure gRPC by default.
58	c.Flags().BoolVar(&mminsecureTr, "dest-insecure-transport", true, "Disable transport security for client connections")
59
60	return c
61}
62
63func makeMirrorCommandFunc(cmd *cobra.Command, args []string) {
64	if len(args) != 1 {
65		ExitWithError(ExitBadArgs, errors.New("make-mirror takes one destination argument"))
66	}
67
68	dialTimeout := dialTimeoutFromCmd(cmd)
69	keepAliveTime := keepAliveTimeFromCmd(cmd)
70	keepAliveTimeout := keepAliveTimeoutFromCmd(cmd)
71	sec := &secureCfg{
72		cert:              mmcert,
73		key:               mmkey,
74		cacert:            mmcacert,
75		insecureTransport: mminsecureTr,
76	}
77
78	cc := &clientConfig{
79		endpoints:        []string{args[0]},
80		dialTimeout:      dialTimeout,
81		keepAliveTime:    keepAliveTime,
82		keepAliveTimeout: keepAliveTimeout,
83		scfg:             sec,
84		acfg:             nil,
85	}
86	dc := cc.mustClient()
87	c := mustClientFromCmd(cmd)
88
89	err := makeMirror(context.TODO(), c, dc)
90	ExitWithError(ExitError, err)
91}
92
93func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) error {
94	total := int64(0)
95
96	go func() {
97		for {
98			time.Sleep(30 * time.Second)
99			fmt.Println(atomic.LoadInt64(&total))
100		}
101	}()
102
103	s := mirror.NewSyncer(c, mmprefix, 0)
104
105	rc, errc := s.SyncBase(ctx)
106
107	// if destination prefix is specified and remove destination prefix is true return error
108	if mmnodestprefix && len(mmdestprefix) > 0 {
109		ExitWithError(ExitBadArgs, fmt.Errorf("`--dest-prefix` and `--no-dest-prefix` cannot be set at the same time, choose one"))
110	}
111
112	// if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix
113	if !mmnodestprefix && len(mmdestprefix) == 0 {
114		mmdestprefix = mmprefix
115	}
116
117	for r := range rc {
118		for _, kv := range r.Kvs {
119			_, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value))
120			if err != nil {
121				return err
122			}
123			atomic.AddInt64(&total, 1)
124		}
125	}
126
127	err := <-errc
128	if err != nil {
129		return err
130	}
131
132	wc := s.SyncUpdates(ctx)
133
134	for wr := range wc {
135		if wr.CompactRevision != 0 {
136			return rpctypes.ErrCompacted
137		}
138
139		var lastRev int64
140		ops := []clientv3.Op{}
141
142		for _, ev := range wr.Events {
143			nextRev := ev.Kv.ModRevision
144			if lastRev != 0 && nextRev > lastRev {
145				_, err := dc.Txn(ctx).Then(ops...).Commit()
146				if err != nil {
147					return err
148				}
149				ops = []clientv3.Op{}
150			}
151			lastRev = nextRev
152			switch ev.Type {
153			case mvccpb.PUT:
154				ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))
155				atomic.AddInt64(&total, 1)
156			case mvccpb.DELETE:
157				ops = append(ops, clientv3.OpDelete(modifyPrefix(string(ev.Kv.Key))))
158				atomic.AddInt64(&total, 1)
159			default:
160				panic("unexpected event type")
161			}
162		}
163
164		if len(ops) != 0 {
165			_, err := dc.Txn(ctx).Then(ops...).Commit()
166			if err != nil {
167				return err
168			}
169		}
170	}
171
172	return nil
173}
174
175func modifyPrefix(key string) string {
176	return strings.Replace(key, mmprefix, mmdestprefix, 1)
177}
178