1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package command 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "github.com/bgentry/speakeasy" 22 "strings" 23 "sync/atomic" 24 "time" 25 26 "go.etcd.io/etcd/api/v3/mvccpb" 27 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" 28 "go.etcd.io/etcd/client/v3" 29 "go.etcd.io/etcd/client/v3/mirror" 30 31 "github.com/spf13/cobra" 32) 33 34var ( 35 mminsecureTr bool 36 mmcert string 37 mmkey string 38 mmcacert string 39 mmprefix string 40 mmdestprefix string 41 mmuser string 42 mmpassword string 43 mmnodestprefix bool 44) 45 46// NewMakeMirrorCommand returns the cobra command for "makeMirror". 47func NewMakeMirrorCommand() *cobra.Command { 48 c := &cobra.Command{ 49 Use: "make-mirror [options] <destination>", 50 Short: "Makes a mirror at the destination etcd cluster", 51 Run: makeMirrorCommandFunc, 52 } 53 54 c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror") 55 c.Flags().StringVar(&mmdestprefix, "dest-prefix", "", "destination prefix to mirror a prefix to a different prefix in the destination cluster") 56 c.Flags().BoolVar(&mmnodestprefix, "no-dest-prefix", false, "mirror key-values to the root of the destination cluster") 57 c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster") 58 c.Flags().StringVar(&mmkey, "dest-key", "", "Identify secure client using this TLS key file") 59 c.Flags().StringVar(&mmcacert, "dest-cacert", "", "Verify certificates of TLS enabled secure servers using this CA bundle") 60 // TODO: secure by default when etcd enables secure gRPC by default. 61 c.Flags().BoolVar(&mminsecureTr, "dest-insecure-transport", true, "Disable transport security for client connections") 62 c.Flags().StringVar(&mmuser, "dest-user", "", "Destination username[:password] for authentication (prompt if password is not supplied)") 63 c.Flags().StringVar(&mmpassword, "dest-password", "", "Destination password for authentication (if this option is used, --user option shouldn't include password)") 64 65 return c 66} 67 68func authDestCfg() *authCfg { 69 if mmuser == "" { 70 return nil 71 } 72 73 var cfg authCfg 74 75 if mmpassword == "" { 76 splitted := strings.SplitN(mmuser, ":", 2) 77 if len(splitted) < 2 { 78 var err error 79 cfg.username = mmuser 80 cfg.password, err = speakeasy.Ask("Destination Password: ") 81 if err != nil { 82 ExitWithError(ExitError, err) 83 } 84 } else { 85 cfg.username = splitted[0] 86 cfg.password = splitted[1] 87 } 88 } else { 89 cfg.username = mmuser 90 cfg.password = mmpassword 91 } 92 93 return &cfg 94} 95 96func makeMirrorCommandFunc(cmd *cobra.Command, args []string) { 97 if len(args) != 1 { 98 ExitWithError(ExitBadArgs, errors.New("make-mirror takes one destination argument")) 99 } 100 101 dialTimeout := dialTimeoutFromCmd(cmd) 102 keepAliveTime := keepAliveTimeFromCmd(cmd) 103 keepAliveTimeout := keepAliveTimeoutFromCmd(cmd) 104 sec := &secureCfg{ 105 cert: mmcert, 106 key: mmkey, 107 cacert: mmcacert, 108 insecureTransport: mminsecureTr, 109 } 110 111 auth := authDestCfg() 112 113 cc := &clientConfig{ 114 endpoints: []string{args[0]}, 115 dialTimeout: dialTimeout, 116 keepAliveTime: keepAliveTime, 117 keepAliveTimeout: keepAliveTimeout, 118 scfg: sec, 119 acfg: auth, 120 } 121 dc := cc.mustClient() 122 c := mustClientFromCmd(cmd) 123 124 err := makeMirror(context.TODO(), c, dc) 125 ExitWithError(ExitError, err) 126} 127 128func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) error { 129 total := int64(0) 130 131 go func() { 132 for { 133 time.Sleep(30 * time.Second) 134 fmt.Println(atomic.LoadInt64(&total)) 135 } 136 }() 137 138 s := mirror.NewSyncer(c, mmprefix, 0) 139 140 rc, errc := s.SyncBase(ctx) 141 142 // if destination prefix is specified and remove destination prefix is true return error 143 if mmnodestprefix && len(mmdestprefix) > 0 { 144 ExitWithError(ExitBadArgs, fmt.Errorf("`--dest-prefix` and `--no-dest-prefix` cannot be set at the same time, choose one")) 145 } 146 147 // if remove destination prefix is false and destination prefix is empty set the value of destination prefix same as prefix 148 if !mmnodestprefix && len(mmdestprefix) == 0 { 149 mmdestprefix = mmprefix 150 } 151 152 for r := range rc { 153 for _, kv := range r.Kvs { 154 _, err := dc.Put(ctx, modifyPrefix(string(kv.Key)), string(kv.Value)) 155 if err != nil { 156 return err 157 } 158 atomic.AddInt64(&total, 1) 159 } 160 } 161 162 err := <-errc 163 if err != nil { 164 return err 165 } 166 167 wc := s.SyncUpdates(ctx) 168 169 for wr := range wc { 170 if wr.CompactRevision != 0 { 171 return rpctypes.ErrCompacted 172 } 173 174 var lastRev int64 175 ops := []clientv3.Op{} 176 177 for _, ev := range wr.Events { 178 nextRev := ev.Kv.ModRevision 179 if lastRev != 0 && nextRev > lastRev { 180 _, err := dc.Txn(ctx).Then(ops...).Commit() 181 if err != nil { 182 return err 183 } 184 ops = []clientv3.Op{} 185 } 186 lastRev = nextRev 187 switch ev.Type { 188 case mvccpb.PUT: 189 ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value))) 190 atomic.AddInt64(&total, 1) 191 case mvccpb.DELETE: 192 ops = append(ops, clientv3.OpDelete(modifyPrefix(string(ev.Kv.Key)))) 193 atomic.AddInt64(&total, 1) 194 default: 195 panic("unexpected event type") 196 } 197 } 198 199 if len(ops) != 0 { 200 _, err := dc.Txn(ctx).Then(ops...).Commit() 201 if err != nil { 202 return err 203 } 204 } 205 } 206 207 return nil 208} 209 210func modifyPrefix(key string) string { 211 return strings.Replace(key, mmprefix, mmdestprefix, 1) 212} 213