1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "encoding/json" 19 "fmt" 20 "path" 21 "time" 22 23 "go.etcd.io/etcd/etcdserver/api" 24 "go.etcd.io/etcd/etcdserver/api/membership" 25 "go.etcd.io/etcd/etcdserver/api/v2store" 26 "go.etcd.io/etcd/pkg/pbutil" 27 28 "github.com/coreos/go-semver/semver" 29 "go.uber.org/zap" 30) 31 32// ApplierV2 is the interface for processing V2 raft messages 33type ApplierV2 interface { 34 Delete(r *RequestV2) Response 35 Post(r *RequestV2) Response 36 Put(r *RequestV2) Response 37 QGet(r *RequestV2) Response 38 Sync(r *RequestV2) Response 39} 40 41func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 { 42 return &applierV2store{lg: lg, store: s, cluster: c} 43} 44 45type applierV2store struct { 46 lg *zap.Logger 47 store v2store.Store 48 cluster *membership.RaftCluster 49} 50 51func (a *applierV2store) Delete(r *RequestV2) Response { 52 switch { 53 case r.PrevIndex > 0 || r.PrevValue != "": 54 return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) 55 default: 56 return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive)) 57 } 58} 59 60func (a *applierV2store) Post(r *RequestV2) Response { 61 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions())) 62} 63 64func (a *applierV2store) Put(r *RequestV2) Response { 65 ttlOptions := r.TTLOptions() 66 exists, existsSet := pbutil.GetBool(r.PrevExist) 67 switch { 68 case existsSet: 69 if exists { 70 if r.PrevIndex == 0 && r.PrevValue == "" { 71 return toResponse(a.store.Update(r.Path, r.Val, ttlOptions)) 72 } 73 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) 74 } 75 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) 76 case r.PrevIndex > 0 || r.PrevValue != "": 77 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) 78 default: 79 if storeMemberAttributeRegexp.MatchString(r.Path) { 80 id := membership.MustParseMemberIDFromKey(path.Dir(r.Path)) 81 var attr membership.Attributes 82 if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { 83 if a.lg != nil { 84 a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) 85 } else { 86 plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) 87 } 88 } 89 if a.cluster != nil { 90 a.cluster.UpdateAttributes(id, attr) 91 } 92 // return an empty response since there is no consumer. 93 return Response{} 94 } 95 if r.Path == membership.StoreClusterVersionKey() { 96 if a.cluster != nil { 97 a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability) 98 } 99 // return an empty response since there is no consumer. 100 return Response{} 101 } 102 return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) 103 } 104} 105 106func (a *applierV2store) QGet(r *RequestV2) Response { 107 return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted)) 108} 109 110func (a *applierV2store) Sync(r *RequestV2) Response { 111 a.store.DeleteExpiredKeys(time.Unix(0, r.Time)) 112 return Response{} 113} 114 115// applyV2Request interprets r as a call to v2store.X 116// and returns a Response interpreted from v2store.Event 117func (s *EtcdServer) applyV2Request(r *RequestV2) Response { 118 stringer := panicAlternativeStringer{ 119 stringer: r, 120 alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, 121 } 122 defer warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, time.Now(), stringer, nil, nil) 123 124 switch r.Method { 125 case "POST": 126 return s.applyV2.Post(r) 127 case "PUT": 128 return s.applyV2.Put(r) 129 case "DELETE": 130 return s.applyV2.Delete(r) 131 case "QGET": 132 return s.applyV2.QGet(r) 133 case "SYNC": 134 return s.applyV2.Sync(r) 135 default: 136 // This should never be reached, but just in case: 137 return Response{Err: ErrUnknownMethod} 138 } 139} 140 141func (r *RequestV2) TTLOptions() v2store.TTLOptionSet { 142 refresh, _ := pbutil.GetBool(r.Refresh) 143 ttlOptions := v2store.TTLOptionSet{Refresh: refresh} 144 if r.Expiration != 0 { 145 ttlOptions.ExpireTime = time.Unix(0, r.Expiration) 146 } 147 return ttlOptions 148} 149 150func toResponse(ev *v2store.Event, err error) Response { 151 return Response{Event: ev, Err: err} 152} 153