1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "encoding/json" 19 "fmt" 20 "path" 21 "strconv" 22 "time" 23 24 "github.com/coreos/go-semver/semver" 25 "go.etcd.io/etcd/pkg/v3/pbutil" 26 "go.etcd.io/etcd/server/v3/etcdserver/api" 27 "go.etcd.io/etcd/server/v3/etcdserver/api/membership" 28 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" 29 30 "go.uber.org/zap" 31) 32 33const v2Version = "v2" 34 35// ApplierV2 is the interface for processing V2 raft messages 36type ApplierV2 interface { 37 Delete(r *RequestV2) Response 38 Post(r *RequestV2) Response 39 Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response 40 QGet(r *RequestV2) Response 41 Sync(r *RequestV2) Response 42} 43 44func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 { 45 if lg == nil { 46 lg = zap.NewNop() 47 } 48 return &applierV2store{lg: lg, store: s, cluster: c} 49} 50 51type applierV2store struct { 52 lg *zap.Logger 53 store v2store.Store 54 cluster *membership.RaftCluster 55} 56 57func (a *applierV2store) Delete(r *RequestV2) Response { 58 switch { 59 case r.PrevIndex > 0 || r.PrevValue != "": 60 return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) 61 default: 62 return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive)) 63 } 64} 65 66func (a *applierV2store) Post(r *RequestV2) Response { 67 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions())) 68} 69 70func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response { 71 ttlOptions := r.TTLOptions() 72 exists, existsSet := pbutil.GetBool(r.PrevExist) 73 switch { 74 case existsSet: 75 if exists { 76 if r.PrevIndex == 0 && r.PrevValue == "" { 77 return toResponse(a.store.Update(r.Path, r.Val, ttlOptions)) 78 } 79 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) 80 } 81 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) 82 case r.PrevIndex > 0 || r.PrevValue != "": 83 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) 84 default: 85 if storeMemberAttributeRegexp.MatchString(r.Path) { 86 id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path)) 87 var attr membership.Attributes 88 if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { 89 a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) 90 } 91 if a.cluster != nil { 92 a.cluster.UpdateAttributes(id, attr, shouldApplyV3) 93 } 94 // return an empty response since there is no consumer. 95 return Response{} 96 } 97 // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 98 if r.Path == membership.StoreClusterVersionKey() { 99 if a.cluster != nil { 100 // persist to backend given v2store can be very stale 101 a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) 102 } 103 return Response{} 104 } 105 return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) 106 } 107} 108 109func (a *applierV2store) QGet(r *RequestV2) Response { 110 return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted)) 111} 112 113func (a *applierV2store) Sync(r *RequestV2) Response { 114 a.store.DeleteExpiredKeys(time.Unix(0, r.Time)) 115 return Response{} 116} 117 118// applyV2Request interprets r as a call to v2store.X 119// and returns a Response interpreted from v2store.Event 120func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) { 121 stringer := panicAlternativeStringer{ 122 stringer: r, 123 alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, 124 } 125 defer func(start time.Time) { 126 success := resp.Err == nil 127 applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) 128 warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil) 129 }(time.Now()) 130 131 switch r.Method { 132 case "POST": 133 return s.applyV2.Post(r) 134 case "PUT": 135 return s.applyV2.Put(r, shouldApplyV3) 136 case "DELETE": 137 return s.applyV2.Delete(r) 138 case "QGET": 139 return s.applyV2.QGet(r) 140 case "SYNC": 141 return s.applyV2.Sync(r) 142 default: 143 // This should never be reached, but just in case: 144 return Response{Err: ErrUnknownMethod} 145 } 146} 147 148func (r *RequestV2) TTLOptions() v2store.TTLOptionSet { 149 refresh, _ := pbutil.GetBool(r.Refresh) 150 ttlOptions := v2store.TTLOptionSet{Refresh: refresh} 151 if r.Expiration != 0 { 152 ttlOptions.ExpireTime = time.Unix(0, r.Expiration) 153 } 154 return ttlOptions 155} 156 157func toResponse(ev *v2store.Event, err error) Response { 158 return Response{Event: ev, Err: err} 159} 160