1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package command 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "strings" 22 "sync/atomic" 23 "time" 24 25 "go.etcd.io/etcd/clientv3" 26 "go.etcd.io/etcd/clientv3/mirror" 27 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 28 "go.etcd.io/etcd/mvcc/mvccpb" 29 30 "github.com/spf13/cobra" 31) 32 33var ( 34 mminsecureTr bool 35 mmcert string 36 mmkey string 37 mmcacert string 38 mmprefix string 39 mmdestprefix string 40 mmnodestprefix bool 41) 42 43// NewMakeMirrorCommand returns the cobra command for "makeMirror". 44func NewMakeMirrorCommand() *cobra.Command { 45 c := &cobra.Command{ 46 Use: "make-mirror [options] <destination>", 47 Short: "Makes a mirror at the destination etcd cluster", 48 Run: makeMirrorCommandFunc, 49 } 50 51 c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror") 52 c.Flags().StringVar(&mmdestprefix, "dest-prefix", "", "destination prefix to mirror a prefix to a different prefix in the destination cluster") 53 c.Flags().BoolVar(&mmnodestprefix, "no-dest-prefix", false, "mirror key-values to the root of the destination cluster") 54 c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster") 55 c.Flags().StringVar(&mmkey, "dest-key", "", "Identify secure client using this TLS key file") 56 c.Flags().StringVar(&mmcacert, "dest-cacert", "", "Verify certificates of TLS enabled secure servers using this CA bundle") 57 // TODO: secure by default when etcd enables secure gRPC by default. 58 c.Flags().BoolVar(&mminsecureTr, "dest-insecure-transport", true, "Disable transport security for client connections") 59 60 return c 61} 62 63func makeMirrorCommandFunc(cmd *cobra.Command, args []string) { 64 if len(args) != 1 { 65 ExitWithError(ExitBadArgs, errors.New("make-mirror takes one destination argument")) 66 } 67 68 dialTimeout := dialTimeoutFromCmd(cmd) 69 keepAliveTime := keepAliveTimeFromCmd(cmd) 70 keepAliveTimeout := keepAliveTimeoutFromCmd(cmd) 71 sec := &secureCfg{ 72 cert: mmcert, 73 key: mmkey, 74 cacert: mmcacert, 75 insecureTransport: mminsecureTr, 76 } 77 78 cc := &clientConfig{ 79 endpoints: []string{args[0]}, 80 dialTimeout: dialTimeout, 81 keepAliveTime: keepAliveTime, 82 keepAliveTimeout: keepAliveTimeout, 83 scfg: sec, 84 acfg: nil, 85 } 86 dc := cc.mustClient() 87 c := mustClientFromCmd(cmd) 88 89 err := makeMirror(context.TODO(), c, dc) 90 ExitWithError(ExitError, err) 91} 92 93func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) error { 94 total := int64(0) 95 96 go func() { 97 for { 98 time.Sleep(30 * time.Second) 99 fmt.Println(atomic.LoadInt64(&total)) 100 } 101 }() 102 103 s := mirror.NewSyncer(c, mmprefix, 0) 104 105 rc, errc := s.SyncBase(ctx) 106 107 // if destination prefix is specified and remove destination prefix is true return error 108 if mmnodestprefix && len(mmdestprefix) > 0 { 109 ExitWithError(ExitBadArgs, fmt.Errorf("`--dest-prefix` and `--no-dest-prefix` cannot be set at the same time, choose one")) 110 } 111 112 // if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix 113 if !mmnodestprefix && len(mmdestprefix) == 0 { 114 mmdestprefix = mmprefix 115 } 116 117 for r := range rc { 118 for _, kv := range r.Kvs { 119 _, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value)) 120 if err != nil { 121 return err 122 } 123 atomic.AddInt64(&total, 1) 124 } 125 } 126 127 err := <-errc 128 if err != nil { 129 return err 130 } 131 132 wc := s.SyncUpdates(ctx) 133 134 for wr := range wc { 135 if wr.CompactRevision != 0 { 136 return rpctypes.ErrCompacted 137 } 138 139 var lastRev int64 140 ops := []clientv3.Op{} 141 142 for _, ev := range wr.Events { 143 nextRev := ev.Kv.ModRevision 144 if lastRev != 0 && nextRev > lastRev { 145 _, err := dc.Txn(ctx).Then(ops...).Commit() 146 if err != nil { 147 return err 148 } 149 ops = []clientv3.Op{} 150 } 151 lastRev = nextRev 152 switch ev.Type { 153 case mvccpb.PUT: 154 ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value))) 155 atomic.AddInt64(&total, 1) 156 case mvccpb.DELETE: 157 ops = append(ops, clientv3.OpDelete(modifyPrefix(string(ev.Kv.Key)))) 158 atomic.AddInt64(&total, 1) 159 default: 160 panic("unexpected event type") 161 } 162 } 163 164 if len(ops) != 0 { 165 _, err := dc.Txn(ctx).Then(ops...).Commit() 166 if err != nil { 167 return err 168 } 169 } 170 } 171 172 return nil 173} 174 175func modifyPrefix(key string) string { 176 return strings.Replace(key, mmprefix, mmdestprefix, 1) 177} 178